2920 lines
83 KiB
C++
2920 lines
83 KiB
C++
/*
|
|
Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
#include <ndb_global.h>
|
|
#include "NdbApi.hpp"
|
|
#include <NdbSchemaCon.hpp>
|
|
#include <md5_hash.hpp>
|
|
|
|
#include <NdbThread.h>
|
|
#include <NdbMutex.h>
|
|
#include <NdbCondition.h>
|
|
#include <NdbSleep.h>
|
|
#include <NdbTick.h>
|
|
#include <NdbOut.hpp>
|
|
#include <NdbTimer.hpp>
|
|
#include <NDBT_Error.hpp>
|
|
|
|
#include <NdbTest.hpp>
|
|
#include <NDBT_Stats.hpp>
|
|
#include <NdbLockCpuUtil.h>
|
|
#include <ndb_limits.h>
|
|
|
|
#define MAX_PARTS 4
|
|
#define MAX_SEEK 16
|
|
#define MAXSTRLEN 16
|
|
#define MAXATTR 511
|
|
#define MAXTABLES 128
|
|
#define MAXINDEXES 16
|
|
#define NDB_MAXTHREADS 384
|
|
#define MAX_EXECUTOR_THREADS 384
|
|
#define MAX_DEFINER_THREADS 32
|
|
#define MAX_REAL_THREADS 416
|
|
#define NDB_MAX_NODES (MAX_NDB_NODES - 1)
|
|
#define NDB_MAX_RECEIVE_CPUS 128
|
|
/*
|
|
NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a
|
|
#define from <sys/thread.h> on AIX (IBM compiler). We explicitly
|
|
#undef it here lest someone use it by habit and get really funny
|
|
results. K&R says we may #undef non-existent symbols, so let's go.
|
|
*/
|
|
#undef MAXTHREADS
|
|
#define MAXPAR 1024
|
|
#define MAXATTRSIZE 1000
|
|
#define PKSIZE 2
|
|
|
|
enum StartType {
|
|
stIdle = 0,
|
|
stInsert = 1,
|
|
stRead = 2,
|
|
stUpdate = 3,
|
|
stDelete = 4,
|
|
stStop = 5
|
|
} ;
|
|
|
|
enum RunType {
|
|
RunInsert = 1,
|
|
RunRead = 2,
|
|
RunUpdate = 3,
|
|
RunDelete = 4,
|
|
RunCreateTable = 5,
|
|
RunDropTable = 6,
|
|
RunAll = 7
|
|
};
|
|
|
|
struct ThreadNdb
|
|
{
|
|
int NoOfOps;
|
|
int ThreadNo;
|
|
char * record;
|
|
};
|
|
|
|
typedef struct KeyOperation KEY_OPERATION;
|
|
struct KeyOperation
|
|
{
|
|
Uint32 first_key;
|
|
Uint32 second_key;
|
|
Uint32 table_id;
|
|
Uint32 definer_thread_id;
|
|
Uint32 executor_thread_id;
|
|
RunType operation_type;
|
|
KEY_OPERATION *next_key_op;
|
|
};
|
|
|
|
typedef struct key_list_header KEY_LIST_HEADER;
|
|
struct key_list_header
|
|
{
|
|
KEY_OPERATION *first_in_list;
|
|
KEY_OPERATION *last_in_list;
|
|
Uint32 num_in_list;
|
|
};
|
|
|
|
|
|
typedef struct thread_data_struct THREAD_DATA;
|
|
struct thread_data_struct
|
|
{
|
|
KEY_LIST_HEADER list_header;
|
|
Uint32 thread_id;
|
|
bool ready;
|
|
bool stop;
|
|
bool start;
|
|
Uint32 rand_seed;
|
|
|
|
char *record;
|
|
NdbMutex *transport_mutex;
|
|
struct NdbCondition *transport_cond;
|
|
struct NdbCondition *main_cond;
|
|
struct NdbCondition *start_cond;
|
|
char not_used[52];
|
|
};
|
|
THREAD_DATA thread_data_array[MAX_DEFINER_THREADS + MAX_EXECUTOR_THREADS];
|
|
|
|
extern "C" { static void* threadLoop(void*); }
|
|
static void setAttrNames(void);
|
|
static void setTableNames(void);
|
|
static int readArguments(int argc, char** argv);
|
|
static void dropTables(Ndb* pMyNdb);
|
|
static int createTables(Ndb*);
|
|
static void defineOperation(NdbConnection* aTransObject, StartType aType,
|
|
Uint32 base, Uint32 aIndex);
|
|
static void defineNdbRecordOperation(char*,
|
|
Uint32 table_id,
|
|
NdbConnection* aTransObject,
|
|
StartType aType,
|
|
Uint32 base,
|
|
Uint32 aIndex,
|
|
THREAD_DATA *my_thread_data);
|
|
static void execute(StartType aType);
|
|
static bool executeThread(ThreadNdb*, StartType aType, Ndb* aNdbObject, unsigned int);
|
|
static bool executeTransLoop(ThreadNdb* pThread, StartType aType, Ndb* aNdbObject,
|
|
unsigned int threadBase, int threadNo);
|
|
static void executeCallback(int result, NdbConnection* NdbObject,
|
|
void* aObject);
|
|
static bool error_handler(const NdbError & err);
|
|
static void input_error();
|
|
static Uint32 get_my_node_id(Uint32 tableNo, Uint32 threadNo);
|
|
|
|
static bool main_thread(RunType run_type, NdbTimer & timer);
|
|
static Uint64 get_total_transactions();
|
|
static Uint64 get_total_rounds();
|
|
static void run_old_flexAsynch(ThreadNdb *pThreadData, NdbTimer & timer);
|
|
|
|
static int retry_opt = 3 ;
|
|
static int failed = 0 ;
|
|
|
|
ErrorData * flexAsynchErrorData;
|
|
|
|
static NdbThread* threadLife[MAX_REAL_THREADS];
|
|
static int tNodeId;
|
|
static int ThreadReady[MAX_REAL_THREADS];
|
|
static longlong ThreadExecutions[MAX_REAL_THREADS];
|
|
static Uint64 ThreadExecutionRounds[MAX_REAL_THREADS];
|
|
static StartType ThreadStart[NDB_MAXTHREADS];
|
|
static char tableName[MAXTABLES][MAXSTRLEN+1];
|
|
static char indexName[MAXTABLES][MAXINDEXES][MAXSTRLEN+1];
|
|
static const NdbDictionary::Table * tables[MAXTABLES];
|
|
static char attrName[MAXATTR][MAXSTRLEN+1];
|
|
static bool nodeTableArray[MAXTABLES][NDB_MAX_NODES + 1];
|
|
static Uint32 numberNodeTable[MAXTABLES];
|
|
static Uint16 receiveCPUArray[NDB_MAX_RECEIVE_CPUS];
|
|
static Uint16 definerCPUArray[NDB_MAX_RECEIVE_CPUS];
|
|
static Uint16 executorCPUArray[NDB_MAX_RECEIVE_CPUS];
|
|
static Uint32 numberOfReceiveCPU = 0;
|
|
static Uint32 numberOfDefinerCPU = 0;
|
|
static Uint32 numberOfExecutorCPU = 0;
|
|
static RunType tRunType = RunAll;
|
|
static int tStdTableNum = 0;
|
|
static int tWarmupTime = 10; //Seconds
|
|
static int tExecutionTime = 30; //Seconds
|
|
static int tCooldownTime = 10; //Seconds
|
|
|
|
// Program Parameters
|
|
static NdbRecord * g_record[MAXTABLES];
|
|
static bool tNdbRecord = false;
|
|
|
|
static int tLocal = 0;
|
|
static int tSendForce = 0;
|
|
static int tNoOfLoops = 1;
|
|
static int tAttributeSize = 1;
|
|
static unsigned int tNoOfThreads = 1;
|
|
static unsigned int tNoOfParallelTrans = 32;
|
|
static unsigned int tNumTables = 1;
|
|
static unsigned int tNumIndexes = 0;
|
|
static unsigned int tNoOfAttributes = 25;
|
|
static unsigned int tNoOfTransactions = 500;
|
|
static unsigned int tNoOfOpsPerTrans = 1;
|
|
static unsigned int tLoadFactor = 80;
|
|
static bool tempTable = false;
|
|
static bool startTransGuess = true;
|
|
static int tExtraReadLoop = 0;
|
|
static bool tNew = false;
|
|
static bool tImmediate = false;
|
|
|
|
//Program Flags
|
|
static int theTestFlag = 0;
|
|
static int theSimpleFlag = 0;
|
|
static int theDirtyFlag = 0;
|
|
static int theWriteFlag = 0;
|
|
static int theStdTableNameFlag = 0;
|
|
static int theTableCreateFlag = 0;
|
|
static int tConnections = 1;
|
|
|
|
#define START_REAL_TIME
|
|
#define STOP_REAL_TIME
|
|
#define DEFINE_TIMER NdbTimer timer
|
|
#define START_TIMER timer.doStart();
|
|
#define STOP_TIMER timer.doStop();
|
|
#define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans)
|
|
|
|
NDBT_Stats a_i, a_u, a_d, a_r;
|
|
|
|
static
|
|
void
|
|
print(const char * name, NDBT_Stats& s)
|
|
{
|
|
printf("%s average: %u/s min: %u/s max: %u/s stddev: %u%%\n",
|
|
name,
|
|
(unsigned)s.getMean(),
|
|
(unsigned)s.getMin(),
|
|
(unsigned)s.getMax(),
|
|
(unsigned)(100*s.getStddev() / s.getMean()));
|
|
}
|
|
|
|
static void
|
|
resetThreads(){
|
|
|
|
for (unsigned i = 0; i < tNoOfThreads ; i++) {
|
|
ThreadReady[i] = 0;
|
|
ThreadStart[i] = stIdle;
|
|
}//for
|
|
}
|
|
|
|
static void
|
|
waitForThreads(Uint32 num_threads_to_wait_for)
|
|
{
|
|
int cont = 0;
|
|
do {
|
|
cont = 0;
|
|
NdbSleep_MilliSleep(20);
|
|
for (unsigned i = 0; i < num_threads_to_wait_for ; i++) {
|
|
if (ThreadReady[i] == 0) {
|
|
// Found one thread not yet ready, continue waiting
|
|
cont = 1;
|
|
break;
|
|
}//if
|
|
}//for
|
|
} while (cont == 1);
|
|
}
|
|
|
|
static void
|
|
tellThreads(StartType what)
|
|
{
|
|
for (unsigned i = 0; i < tNoOfThreads ; i++)
|
|
ThreadStart[i] = what;
|
|
}
|
|
|
|
static Ndb_cluster_connection *g_cluster_connection[64];
|
|
|
|
int main(int argc, char** argv)
|
|
{
|
|
ndb_init();
|
|
ThreadNdb* pThreadData;
|
|
DEFINE_TIMER;
|
|
int returnValue = NDBT_OK;
|
|
|
|
flexAsynchErrorData = new ErrorData;
|
|
flexAsynchErrorData->resetErrorCounters();
|
|
|
|
if (readArguments(argc, argv) != 0){
|
|
input_error();
|
|
return NDBT_ProgramExit(NDBT_WRONGARGS);
|
|
}
|
|
|
|
pThreadData = new ThreadNdb[NDB_MAXTHREADS];
|
|
|
|
ndbout << endl << "FLEXASYNCH - Starting normal mode" << endl;
|
|
ndbout << "Perform benchmark of insert, update and delete transactions";
|
|
ndbout << endl;
|
|
ndbout << " " << tNoOfThreads << " number of concurrent threads " << endl;
|
|
ndbout << " " << tNoOfParallelTrans;
|
|
ndbout << " number of parallel operation per thread " << endl;
|
|
ndbout << " " << tNoOfTransactions << " transaction(s) per round " << endl;
|
|
if (tRunType == RunAll){
|
|
ndbout << " " << tNoOfLoops << " iterations " << endl;
|
|
} else if (tRunType == RunRead || tRunType == RunUpdate){
|
|
ndbout << " Warmup time is " << tWarmupTime << endl;
|
|
ndbout << " Execution time is " << tExecutionTime << endl;
|
|
ndbout << " Cooldown time is " << tCooldownTime << endl;
|
|
}
|
|
ndbout << " " << "Load Factor is " << tLoadFactor << "%" << endl;
|
|
ndbout << " " << tNumTables << " tables " << endl;
|
|
ndbout << " " << tNoOfAttributes << " attributes per table " << endl;
|
|
ndbout << " " << tNumIndexes << " ordered indexes per table " << endl;
|
|
ndbout << " " << tAttributeSize;
|
|
ndbout << " is the number of 32 bit words per attribute " << endl;
|
|
if (tempTable == true) {
|
|
ndbout << " Tables are without logging " << endl;
|
|
} else {
|
|
ndbout << " Tables are with logging " << endl;
|
|
}//if
|
|
if (startTransGuess == true) {
|
|
ndbout << " Transactions are executed with hint provided" << endl;
|
|
} else {
|
|
ndbout << " Transactions are executed with round robin scheme" << endl;
|
|
}//if
|
|
if (tSendForce == 0) {
|
|
ndbout << " No force send is used, adaptive algorithm used" << endl;
|
|
} else if (tSendForce == 1) {
|
|
ndbout << " Force send used" << endl;
|
|
} else {
|
|
ndbout << " No force send is used, adaptive algorithm disabled" << endl;
|
|
}//if
|
|
|
|
ndbout << endl;
|
|
|
|
NdbThread_SetConcurrencyLevel(2 + (tNoOfThreads * 5 / 4));
|
|
|
|
/* print Setting */
|
|
flexAsynchErrorData->printSettings(ndbout);
|
|
|
|
setAttrNames();
|
|
setTableNames();
|
|
|
|
if (tConnections > 1)
|
|
{
|
|
printf("Creating %u connections...", tConnections);
|
|
fflush(stdout);
|
|
}
|
|
for (int i = 0; i < tConnections; i++)
|
|
{
|
|
g_cluster_connection[i] = new Ndb_cluster_connection();
|
|
ndbout_c("CPU for this connection: %u", receiveCPUArray[i]);
|
|
}
|
|
for (int i = 0; i < tConnections; i++)
|
|
{
|
|
if(g_cluster_connection[i]->connect(12, 5, 1) != 0)
|
|
return NDBT_ProgramExit(NDBT_FAILED);
|
|
if (numberOfReceiveCPU != 0)
|
|
{
|
|
g_cluster_connection[i]->set_recv_thread_cpu(&receiveCPUArray[i],
|
|
Uint32(1));
|
|
}
|
|
}
|
|
if (tConnections > 1)
|
|
{
|
|
printf("\n");
|
|
fflush(stdout);
|
|
}
|
|
|
|
Ndb * pNdb = new Ndb(g_cluster_connection[0], "TEST_DB");
|
|
pNdb->init();
|
|
tNodeId = pNdb->getNodeId();
|
|
|
|
ndbout << " NdbAPI node with id = " << pNdb->getNodeId() << endl;
|
|
ndbout << endl;
|
|
|
|
ndbout << "Waiting for ndb to become ready..." <<endl;
|
|
if (pNdb->waitUntilReady(10000) != 0){
|
|
ndbout << "NDB is not ready" << endl;
|
|
ndbout << "Benchmark failed!" << endl;
|
|
return NDBT_ProgramExit(NDBT_FAILED);
|
|
}
|
|
|
|
if (tRunType == RunCreateTable)
|
|
{
|
|
if (createTables(pNdb) != 0){
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
}
|
|
else if (tRunType == RunDropTable)
|
|
{
|
|
dropTables(pNdb);
|
|
}
|
|
else if(returnValue == NDBT_OK){
|
|
if (createTables(pNdb) != 0){
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
}
|
|
|
|
if (returnValue == NDBT_OK &&
|
|
tNdbRecord && !tNew)
|
|
{
|
|
Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
|
|
sz += 3;
|
|
for (Uint32 i = 0; i<tNoOfThreads; i++)
|
|
{
|
|
pThreadData[i].record = (char*)malloc(sz);
|
|
bzero(pThreadData[i].record, sz);
|
|
}
|
|
}
|
|
|
|
if(returnValue == NDBT_OK &&
|
|
tRunType != RunCreateTable &&
|
|
tRunType != RunDropTable){
|
|
if (tNew)
|
|
{
|
|
if (!main_thread(tRunType, timer))
|
|
{
|
|
returnValue = NDBT_FAILED;
|
|
}
|
|
}
|
|
else
|
|
{
|
|
run_old_flexAsynch(pThreadData, timer);
|
|
}
|
|
}
|
|
|
|
if (tRunType == RunAll)
|
|
{
|
|
dropTables(pNdb);
|
|
}
|
|
delete [] pThreadData;
|
|
delete pNdb;
|
|
|
|
if (tRunType == RunAll ||
|
|
tRunType == RunInsert ||
|
|
tRunType == RunDelete ||
|
|
tRunType == RunUpdate ||
|
|
tRunType == RunRead)
|
|
{
|
|
//printing errorCounters
|
|
flexAsynchErrorData->printErrorCounters(ndbout);
|
|
if (tRunType == RunAll) {
|
|
print("insert", a_i);
|
|
print("update", a_u);
|
|
print("delete", a_d);
|
|
print("read ", a_r);
|
|
}
|
|
}
|
|
if ((tRunType == RunInsert ||
|
|
tRunType == RunRead ||
|
|
tRunType == RunUpdate ||
|
|
tRunType == RunDelete) &&
|
|
returnValue == NDBT_OK)
|
|
{
|
|
Uint64 total_transactions = 0;
|
|
Uint64 total_rounds = 0;
|
|
Uint64 exec_time;
|
|
|
|
if (tNew)
|
|
{
|
|
total_transactions = get_total_transactions();
|
|
total_rounds = get_total_rounds();
|
|
}
|
|
else
|
|
{
|
|
if (tRunType == RunInsert || tRunType == RunDelete)
|
|
{
|
|
total_transactions = (longlong)tNoOfTransactions;
|
|
total_transactions *= (longlong)tNoOfThreads;
|
|
total_transactions *= (longlong)tNoOfParallelTrans;
|
|
}
|
|
else
|
|
{
|
|
for (Uint32 i = 0; i < tNoOfThreads; i++)
|
|
{
|
|
total_transactions += ThreadExecutions[i];
|
|
total_rounds += ThreadExecutionRounds[i];
|
|
}
|
|
}
|
|
}
|
|
Uint64 mean_rounds;
|
|
if (total_rounds)
|
|
{
|
|
mean_rounds = total_transactions / total_rounds;
|
|
}
|
|
else
|
|
{
|
|
mean_rounds = 0;
|
|
}
|
|
if (tRunType == RunInsert || tRunType == RunDelete) {
|
|
exec_time = (Uint64)timer.elapsedTime();
|
|
} else {
|
|
exec_time = (Uint64)tExecutionTime * 1000;
|
|
}
|
|
ndbout << "Total number of transactions is " << total_transactions;
|
|
ndbout << endl;
|
|
ndbout << "Execution time is " << exec_time << " milliseconds" << endl;
|
|
|
|
if (!exec_time)
|
|
{
|
|
exec_time = 1; /* Avoid floating point exception */
|
|
ndbout_c("Zero execution time!!!");
|
|
}
|
|
total_transactions = (total_transactions * 1000) / exec_time;
|
|
int trans_per_sec = (int)total_transactions;
|
|
ndbout << "Total transactions per second " << trans_per_sec << endl;
|
|
ndbout << "Mean executions per round is " << mean_rounds << endl;
|
|
}
|
|
|
|
for (int i = 0; i < tConnections; i++)
|
|
{
|
|
delete g_cluster_connection[i];
|
|
}
|
|
|
|
return NDBT_ProgramExit(returnValue);
|
|
}//main()
|
|
|
|
|
|
static void execute(StartType aType)
|
|
{
|
|
resetThreads();
|
|
tellThreads(aType);
|
|
waitForThreads(tNoOfThreads);
|
|
}//execute()
|
|
|
|
static void*
|
|
threadLoop(void* ThreadData)
|
|
{
|
|
Ndb* localNdb;
|
|
StartType tType;
|
|
ThreadNdb* tabThread = (ThreadNdb*)ThreadData;
|
|
int threadNo = tabThread->ThreadNo;
|
|
localNdb = new Ndb(g_cluster_connection[threadNo % tConnections], "TEST_DB");
|
|
localNdb->init(MAXPAR);
|
|
localNdb->waitUntilReady(10000);
|
|
unsigned int threadBase = threadNo;
|
|
|
|
|
|
for (;;){
|
|
while (ThreadStart[threadNo] == stIdle) {
|
|
NdbSleep_MilliSleep(10);
|
|
}//while
|
|
|
|
// Check if signal to exit is received
|
|
if (ThreadStart[threadNo] == stStop) {
|
|
break;
|
|
}//if
|
|
|
|
tType = ThreadStart[threadNo];
|
|
ThreadStart[threadNo] = stIdle;
|
|
if (tRunType == RunAll || tRunType == RunInsert || tRunType == RunDelete){
|
|
if(!executeThread(tabThread,
|
|
tType,
|
|
localNdb,
|
|
threadBase)){
|
|
break;
|
|
}
|
|
} else {
|
|
if(!executeTransLoop(tabThread,
|
|
tType,
|
|
localNdb,
|
|
threadBase,
|
|
threadNo)){
|
|
break;
|
|
}
|
|
}
|
|
ThreadReady[threadNo] = 1;
|
|
}//for
|
|
|
|
delete localNdb;
|
|
ThreadReady[threadNo] = 1;
|
|
|
|
return NULL;
|
|
}//threadLoop()
|
|
|
|
static int error_count = 0;
|
|
|
|
static bool
|
|
update_num_ops(Uint32 *num_ops, NdbConnection **tConArray)
|
|
{
|
|
/*
|
|
Move num_ops forward to next unused position, can be old
|
|
transactions still outstanding
|
|
*/
|
|
for ( ; *num_ops < tNoOfParallelTrans; (*num_ops)++)
|
|
{
|
|
if (tConArray[*num_ops])
|
|
continue;
|
|
else
|
|
break;
|
|
}
|
|
if (*num_ops == tNoOfParallelTrans)
|
|
return true;
|
|
return false;
|
|
}
|
|
|
|
static int
|
|
executeTrans(ThreadNdb* pThread,
|
|
StartType aType,
|
|
Ndb* aNdbObject,
|
|
unsigned int threadBase,
|
|
unsigned int record,
|
|
Uint32 nodeId,
|
|
NdbConnection **tConArray,
|
|
bool execute_all)
|
|
{
|
|
unsigned int tBase;
|
|
unsigned int tBase2;
|
|
Uint32 threadBaseLoc, threadBaseLoc2;
|
|
Uint32 num_ops = 0;
|
|
Uint32 i, loops;
|
|
|
|
START_REAL_TIME;
|
|
for (i = record, loops = 0;
|
|
i < tNoOfTransactions &&
|
|
loops < 16 &&
|
|
num_ops < tNoOfParallelTrans;
|
|
i++, loops++)
|
|
{
|
|
tBase = i * tNoOfParallelTrans * tNoOfOpsPerTrans;
|
|
threadBaseLoc = (threadBase * tNoOfTransactions * tNoOfParallelTrans) +
|
|
(i * tNoOfParallelTrans);
|
|
for (unsigned int j = 0; j < tNoOfParallelTrans; j++) {
|
|
if (update_num_ops(&num_ops, tConArray))
|
|
break;
|
|
threadBaseLoc2 = threadBaseLoc + j;
|
|
tBase2 = tBase + (j * tNoOfOpsPerTrans);
|
|
if (startTransGuess == true) {
|
|
union {
|
|
Uint64 _align;
|
|
Uint32 Tkey32[2];
|
|
};
|
|
(void)_align;
|
|
|
|
Tkey32[0] = threadBaseLoc2;
|
|
Tkey32[1] = tBase2;
|
|
Ndb::Key_part_ptr hint[2];
|
|
hint[0].ptr = Tkey32+0;
|
|
hint[0].len = 4;
|
|
hint[1].ptr = 0;
|
|
hint[1].len = 0;
|
|
|
|
tConArray[num_ops] = aNdbObject->startTransaction(tables[0], hint);
|
|
}
|
|
else
|
|
{
|
|
tConArray[num_ops] = aNdbObject->startTransaction();
|
|
}
|
|
|
|
if (tConArray[num_ops] == NULL){
|
|
error_handler(aNdbObject->getNdbError());
|
|
ndbout << endl << "Unable to recover! Quitting now" << endl ;
|
|
return -1;
|
|
}//if
|
|
|
|
if (nodeId != 0 &&
|
|
tConArray[num_ops]->getConnectedNodeId() != nodeId)
|
|
{
|
|
/*
|
|
We're running only local operations, this won't be local,
|
|
ignore this record
|
|
*/
|
|
aNdbObject->closeTransaction(tConArray[num_ops]);
|
|
tConArray[num_ops] = NULL;
|
|
continue;
|
|
}
|
|
for (unsigned int k = 0; k < tNoOfOpsPerTrans; k++) {
|
|
//-------------------------------------------------------
|
|
// Define the operation, but do not execute it yet.
|
|
//-------------------------------------------------------
|
|
if (tNdbRecord)
|
|
defineNdbRecordOperation(pThread->record,
|
|
(Uint32)0,
|
|
tConArray[num_ops],
|
|
aType,
|
|
threadBaseLoc2,
|
|
(tBase2+k),
|
|
NULL);
|
|
else
|
|
defineOperation(tConArray[num_ops],
|
|
aType,
|
|
threadBaseLoc2,
|
|
(tBase2 + k));
|
|
}//for
|
|
|
|
tConArray[num_ops]->executeAsynchPrepare(Commit,
|
|
&executeCallback,
|
|
(void*)&tConArray[num_ops]);
|
|
num_ops++;
|
|
}//for
|
|
}//for
|
|
STOP_REAL_TIME;
|
|
if (num_ops == 0)
|
|
return 0;
|
|
//-------------------------------------------------------
|
|
// Now we have defined a set of operations, it is now time
|
|
// to execute all of them. If execute_all isn't set, we
|
|
// only execute at least half of them. In this manner we
|
|
// can cater for different execution speeds in different
|
|
// parts of the system.
|
|
//-------------------------------------------------------
|
|
int min_execs = execute_all ? (int)num_ops :
|
|
(num_ops > 1 ? (int)(num_ops / 2) : 1);
|
|
int Tcomp = aNdbObject->sendPollNdb(3000,
|
|
min_execs,
|
|
tSendForce);
|
|
while (Tcomp < min_execs) {
|
|
int TlocalComp = aNdbObject->pollNdb(3000, min_execs - Tcomp);
|
|
Tcomp += TlocalComp;
|
|
}//while
|
|
if (aNdbObject->getNdbError().code != 0 && error_count < 10000){
|
|
error_count++;
|
|
ndbout << "i = " << i << ", error = ";
|
|
ndbout << aNdbObject->getNdbError().code << ", threadBase = ";
|
|
ndbout << hex << threadBase << endl;
|
|
}
|
|
return Tcomp;
|
|
}
|
|
|
|
static
|
|
bool
|
|
executeTransLoop(ThreadNdb* pThread,
|
|
StartType aType,
|
|
Ndb* aNdbObject,
|
|
unsigned int threadBase,
|
|
int threadNo) {
|
|
bool continue_flag = true;
|
|
int time_expired;
|
|
unsigned int i = 0;
|
|
Uint32 nodeId;
|
|
int ops = 0;
|
|
int record;
|
|
Uint32 local_count = 0;
|
|
Uint64 executions = 0;
|
|
bool execute_all = true;
|
|
DEFINE_TIMER;
|
|
NdbConnection* tConArray[MAXPAR];
|
|
|
|
for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
|
|
if (tLocal > 0)
|
|
{
|
|
nodeId = get_my_node_id((Uint32)0, threadBase);
|
|
}
|
|
else
|
|
nodeId = 0;
|
|
ThreadExecutions[threadNo] = 0;
|
|
ThreadExecutionRounds[threadNo] = 0;
|
|
START_TIMER;
|
|
do
|
|
{
|
|
executions++;
|
|
if (tLocal == 2)
|
|
{
|
|
/* Select node on round robin basis */
|
|
local_count++;
|
|
nodeId = get_my_node_id((Uint32)0, local_count);
|
|
}
|
|
else if (tLocal == 3)
|
|
{
|
|
/* Select node on random basis */
|
|
local_count = (Uint32)(rand() % numberNodeTable[0]);
|
|
nodeId = get_my_node_id((Uint32)0, local_count);
|
|
}
|
|
record = rand() % tNoOfTransactions;
|
|
if ((ops = executeTrans(pThread,
|
|
aType,
|
|
aNdbObject,
|
|
threadBase,
|
|
(Uint32)record,
|
|
nodeId,
|
|
tConArray,
|
|
execute_all)) < 0)
|
|
return false;
|
|
STOP_TIMER;
|
|
if (!continue_flag)
|
|
break;
|
|
time_expired = (int)(timer.elapsedTime() / 1000);
|
|
if (time_expired < tWarmupTime)
|
|
; //Do nothing
|
|
else if (time_expired < (tWarmupTime + tExecutionTime)){
|
|
executions += ops; //Count measurement
|
|
}
|
|
else if (time_expired < (tWarmupTime + tExecutionTime + tCooldownTime))
|
|
; //Do nothing
|
|
else
|
|
{
|
|
execute_all = true;
|
|
continue_flag = false; //Time expired
|
|
}
|
|
if (i == tNoOfTransactions) /* Make sure the record exists */
|
|
i = 0;
|
|
} while (1);
|
|
ThreadExecutions[threadNo] = executions;
|
|
ThreadExecutionRounds[threadNo] = executions;
|
|
return true;
|
|
}//executeTransLoop()
|
|
|
|
static
|
|
bool
|
|
executeThread(ThreadNdb* pThread,
|
|
StartType aType,
|
|
Ndb* aNdbObject,
|
|
unsigned int threadBase) {
|
|
NdbConnection* tConArray[MAXPAR];
|
|
|
|
for (Uint32 i = 0; i < MAXPAR; i++) tConArray[i] = NULL;
|
|
for (unsigned int i = 0; i < tNoOfTransactions; i++) {
|
|
if ((executeTrans(pThread,
|
|
aType,
|
|
aNdbObject,
|
|
threadBase,
|
|
i,
|
|
(Uint32)0,
|
|
tConArray,
|
|
true)) < 0)
|
|
return false;
|
|
}//for
|
|
return true;
|
|
}//executeThread()
|
|
|
|
static void
|
|
executeCallback(int result, NdbConnection* transObject, void* aObject)
|
|
{
|
|
NdbConnection **array_ref = (NdbConnection**)aObject;
|
|
require(transObject == *array_ref);
|
|
*array_ref = NULL;
|
|
if (result == -1 && failed < 100)
|
|
{
|
|
|
|
// Add complete error handling here
|
|
|
|
int retCode = flexAsynchErrorData->handleErrorCommon(transObject->getNdbError());
|
|
if (retCode == 1)
|
|
{
|
|
if (transObject->getNdbError().code != 626 &&
|
|
transObject->getNdbError().code != 630)
|
|
{
|
|
ndbout_c("execute: %s", transObject->getNdbError().message);
|
|
ndbout_c("Error code = %d", transObject->getNdbError().code);
|
|
}
|
|
}
|
|
else if (retCode == 2)
|
|
{
|
|
ndbout << "4115 should not happen in flexAsynch" << endl;
|
|
}
|
|
else if (retCode == 3)
|
|
{
|
|
/* What can we do here? */
|
|
ndbout_c("execute: %s", transObject->getNdbError().message);
|
|
}//if(retCode == 3)
|
|
// ndbout << "Error occurred in poll:" << endl;
|
|
// ndbout << NdbObject->getNdbError() << endl;
|
|
failed++ ;
|
|
}//if
|
|
transObject->close(); /* Close transaction */
|
|
return;
|
|
}//executeCallback()
|
|
|
|
|
|
|
|
static void
|
|
defineOperation(NdbConnection* localNdbConnection, StartType aType,
|
|
Uint32 threadBase, Uint32 aIndex)
|
|
{
|
|
NdbOperation* localNdbOperation;
|
|
unsigned int loopCountAttributes = tNoOfAttributes;
|
|
unsigned int countAttributes;
|
|
Uint32 attrValue[MAXATTRSIZE];
|
|
|
|
//-------------------------------------------------------
|
|
// Set-up the attribute values for this operation.
|
|
//-------------------------------------------------------
|
|
attrValue[0] = threadBase;
|
|
attrValue[1] = aIndex;
|
|
for (unsigned k = 2; k < loopCountAttributes; k++) {
|
|
attrValue[k] = aIndex;
|
|
}//for
|
|
localNdbOperation = localNdbConnection->getNdbOperation(tableName[0]);
|
|
if (localNdbOperation == NULL) {
|
|
error_handler(localNdbConnection->getNdbError());
|
|
}//if
|
|
switch (aType) {
|
|
case stInsert: { // Insert case
|
|
if (theWriteFlag == 1 && theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyWrite();
|
|
} else if (theWriteFlag == 1) {
|
|
localNdbOperation->writeTuple();
|
|
} else {
|
|
localNdbOperation->insertTuple();
|
|
}//if
|
|
break;
|
|
}//case
|
|
case stRead: { // Read Case
|
|
if (theSimpleFlag == 1) {
|
|
localNdbOperation->simpleRead();
|
|
} else if (theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyRead();
|
|
} else {
|
|
localNdbOperation->readTuple();
|
|
}//if
|
|
break;
|
|
}//case
|
|
case stUpdate: { // Update Case
|
|
if (theWriteFlag == 1 && theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyWrite();
|
|
} else if (theWriteFlag == 1) {
|
|
localNdbOperation->writeTuple();
|
|
} else if (theDirtyFlag == 1) {
|
|
localNdbOperation->dirtyUpdate();
|
|
} else {
|
|
localNdbOperation->updateTuple();
|
|
}//if
|
|
break;
|
|
}//case
|
|
case stDelete: { // Delete Case
|
|
localNdbOperation->deleteTuple();
|
|
break;
|
|
}//case
|
|
default: {
|
|
error_handler(localNdbOperation->getNdbError());
|
|
}//default
|
|
}//switch
|
|
|
|
localNdbOperation->equal((Uint32)0, (char*)(attrValue + 0));
|
|
localNdbOperation->equal((Uint32)1, (char*)(attrValue + 1));
|
|
switch (aType) {
|
|
case stInsert: // Insert case
|
|
case stUpdate: // Update Case
|
|
{
|
|
for (countAttributes = 1;
|
|
countAttributes < loopCountAttributes; countAttributes++) {
|
|
localNdbOperation->setValue(countAttributes + 1,
|
|
(char*)&attrValue[0]);
|
|
}//for
|
|
break;
|
|
}//case
|
|
case stRead: { // Read Case
|
|
for (countAttributes = 1;
|
|
countAttributes < loopCountAttributes; countAttributes++) {
|
|
localNdbOperation->getValue(countAttributes + 1,
|
|
(char*)&attrValue[0]);
|
|
}//for
|
|
break;
|
|
}//case
|
|
case stDelete: { // Delete Case
|
|
break;
|
|
}//case
|
|
default: {
|
|
//goto error_handler; < epaulsa
|
|
error_handler(localNdbOperation->getNdbError());
|
|
}//default
|
|
}//switch
|
|
return;
|
|
}//defineOperation()
|
|
|
|
|
|
static void
|
|
defineNdbRecordOperation(char *record,
|
|
Uint32 table_id,
|
|
NdbConnection* pTrans,
|
|
StartType aType,
|
|
Uint32 threadBase,
|
|
Uint32 aIndex,
|
|
THREAD_DATA *my_thread_data)
|
|
{
|
|
Uint32 offset;
|
|
Uint32 rand_val = 0;
|
|
NdbRecord *ndb_record = g_record[table_id];
|
|
NdbDictionary::getOffset(ndb_record, 0, offset);
|
|
* (Uint32*)(record + offset) = threadBase;
|
|
NdbDictionary::getOffset(ndb_record, 1, offset);
|
|
* (Uint32*)(record + offset) = aIndex;
|
|
|
|
//-------------------------------------------------------
|
|
// Set-up the attribute values for this operation.
|
|
//-------------------------------------------------------
|
|
if (aType != stRead && aType != stDelete)
|
|
{
|
|
if (my_thread_data != NULL && aType != stInsert)
|
|
{
|
|
#ifdef _WIN32
|
|
rand_val = rand();
|
|
#else
|
|
my_thread_data->rand_seed++;
|
|
rand_val = (Uint32)rand_r(&my_thread_data->rand_seed);
|
|
#endif
|
|
}
|
|
for (unsigned k = 1; k < tNoOfAttributes; k++) {
|
|
NdbDictionary::getOffset(ndb_record, k + 1, offset);
|
|
* (Uint32*)(record + offset) = aIndex + rand_val;
|
|
}//for
|
|
}
|
|
|
|
const NdbOperation* op;
|
|
switch (aType) {
|
|
case stInsert: { // Insert case
|
|
if (theWriteFlag == 1)
|
|
{
|
|
op = pTrans->writeTuple(ndb_record,
|
|
record,
|
|
ndb_record,
|
|
record);
|
|
}
|
|
else
|
|
{
|
|
op = pTrans->insertTuple(ndb_record,
|
|
record,
|
|
ndb_record,
|
|
record);
|
|
}
|
|
break;
|
|
}//case
|
|
case stRead: { // Read Case
|
|
op = pTrans->readTuple(ndb_record,
|
|
record,
|
|
ndb_record,
|
|
record,
|
|
NdbOperation::LM_CommittedRead);
|
|
break;
|
|
}//case
|
|
case stUpdate:{ // Update Case
|
|
op = pTrans->updateTuple(ndb_record,
|
|
record,
|
|
ndb_record,
|
|
record);
|
|
break;
|
|
}//case
|
|
case stDelete: { // Delete Case
|
|
op = pTrans->deleteTuple(ndb_record,
|
|
record,
|
|
ndb_record);
|
|
break;
|
|
}//case
|
|
default: {
|
|
abort();
|
|
}//default
|
|
}//switch
|
|
|
|
if (op == NULL)
|
|
{
|
|
ndbout << "Operation is null " << pTrans->getNdbError() << endl;
|
|
abort();
|
|
}
|
|
|
|
require(op != 0);
|
|
}
|
|
|
|
static void setAttrNames()
|
|
{
|
|
int i;
|
|
|
|
for (i = 0; i < MAXATTR ; i++){
|
|
BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i);
|
|
}
|
|
}
|
|
|
|
|
|
static void setTableNames()
|
|
{
|
|
// Note! Uses only uppercase letters in table name's
|
|
// so that we can look at the tables with SQL
|
|
unsigned int i, j;
|
|
for (i = 0; i < tNumTables ; i++)
|
|
{
|
|
if (theStdTableNameFlag==0)
|
|
{
|
|
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%u_%u", i,
|
|
(unsigned)(NdbTick_CurrentMillisecond()+rand()));
|
|
for (j = 0; j < tNumIndexes; j++)
|
|
{
|
|
BaseString::snprintf(indexName[i][j], MAXSTRLEN, "INDEX%u_%u_%u", i, j,
|
|
(unsigned)(NdbTick_CurrentMillisecond()+rand()));
|
|
|
|
}
|
|
}
|
|
else
|
|
{
|
|
BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%u", tStdTableNum, i);
|
|
for (j = 0; j < tNumIndexes; j++)
|
|
{
|
|
BaseString::snprintf(indexName[i][j], MAXSTRLEN, "INDEX%d_%u_%u",
|
|
tStdTableNum, i, j);
|
|
}
|
|
}
|
|
ndbout << "Using table name " << tableName[i] << endl;
|
|
}
|
|
}
|
|
|
|
static void
|
|
dropTables(Ndb* pMyNdb)
|
|
{
|
|
unsigned int i;
|
|
for (i = 0; i < tNumTables; i++)
|
|
{
|
|
ndbout << "Dropping table " << tableName[i] << "..." << endl;
|
|
pMyNdb->getDictionary()->dropTable(tableName[i]);
|
|
}
|
|
}
|
|
|
|
/*
|
|
Set up nodeTableArray with a boolean true for all nodes that
|
|
contains the table.
|
|
*/
|
|
static int
|
|
setUpNodeTableArray(Uint32 tableNo, const NdbDictionary::Table *pTab)
|
|
{
|
|
Uint32 numFragments = pTab->getFragmentCount();
|
|
printf("numFragments = %u\n", numFragments);
|
|
Uint32 nodeId;
|
|
for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
|
|
nodeTableArray[tableNo][i] = false;
|
|
for (Uint32 i = 0; i < numFragments; i++)
|
|
{
|
|
if ((pTab->getFragmentNodes(i, &nodeId, (Uint32)1)) == 0)
|
|
{
|
|
return 1;
|
|
}
|
|
nodeTableArray[tableNo][nodeId] = true;
|
|
}
|
|
numberNodeTable[tableNo] = 0;
|
|
for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
|
|
{
|
|
if (nodeTableArray[tableNo][i])
|
|
numberNodeTable[tableNo]++;
|
|
}
|
|
printf("number of nodes = %u\n", numberNodeTable[tableNo]);
|
|
return 0;
|
|
}
|
|
|
|
static Uint32
|
|
get_node_relative_id(Uint32 tableNo, Uint32 node_id)
|
|
{
|
|
Uint32 rel_id = 0;
|
|
|
|
for (Uint32 i = 1; i < node_id; i++)
|
|
{
|
|
if (nodeTableArray[tableNo][i])
|
|
rel_id++;
|
|
}
|
|
return rel_id;
|
|
}
|
|
|
|
static Uint32
|
|
get_node_count(Uint32 tableNo)
|
|
{
|
|
return get_node_relative_id(tableNo, NDB_MAX_NODES + 1);
|
|
}
|
|
|
|
static Uint32
|
|
get_my_node_id(Uint32 tableNo, Uint32 threadNo)
|
|
{
|
|
Uint32 count = 0;
|
|
Uint32 n = threadNo % numberNodeTable[tableNo];
|
|
for (Uint32 i = 1; i <= NDB_MAX_NODES; i++)
|
|
{
|
|
if (nodeTableArray[tableNo][i])
|
|
{
|
|
if (count == n)
|
|
return i;
|
|
count++;
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
int
|
|
createTables(Ndb* pMyNdb)
|
|
{
|
|
NdbDictionary::Dictionary* pDict = pMyNdb->getDictionary();
|
|
if (theTableCreateFlag == 0 || tRunType == RunCreateTable)
|
|
{
|
|
for(unsigned i=0; i < tNumTables ;i++)
|
|
{
|
|
ndbout << "Creating " << tableName[i] << "..." << endl;
|
|
|
|
NdbDictionary::Table tab;
|
|
tab.setName(tableName[i]);
|
|
if (tempTable)
|
|
{
|
|
tab.setLogging(false);
|
|
}
|
|
|
|
{
|
|
NdbDictionary::Column distkey;
|
|
distkey.setName("DISTKEY");
|
|
distkey.setType(NdbDictionary::Column::Unsigned);
|
|
distkey.setPrimaryKey(true);
|
|
distkey.setDistributionKey(true);
|
|
tab.addColumn(distkey);
|
|
}
|
|
|
|
{
|
|
NdbDictionary::Column pk;
|
|
pk.setName(attrName[0]);
|
|
pk.setType(NdbDictionary::Column::Unsigned);
|
|
pk.setPrimaryKey(true);
|
|
tab.addColumn(pk);
|
|
}
|
|
|
|
for (unsigned j = 1; j < tNoOfAttributes ; j++)
|
|
{
|
|
NdbDictionary::Column col;
|
|
col.setName(attrName[j]);
|
|
col.setType(NdbDictionary::Column::Unsigned);
|
|
col.setLength(tAttributeSize);
|
|
tab.addColumn(col);
|
|
}
|
|
{
|
|
int res = pDict->createTable(tab);
|
|
if (res != 0)
|
|
{
|
|
ndbout << pDict->getNdbError() << endl;
|
|
return -1;
|
|
}
|
|
}
|
|
for (unsigned j = 0; j < tNumIndexes; j++)
|
|
{
|
|
NdbDictionary::Index ndb_index(indexName[i][j]);
|
|
ndb_index.setType(NdbDictionary::Index::OrderedIndex);
|
|
ndb_index.setLogging(FALSE);
|
|
if (ndb_index.setTable(tableName[i]))
|
|
{
|
|
ndbout << "setTableError " << errno << endl;
|
|
}
|
|
if (ndb_index.addColumnName(attrName[j+1]))
|
|
{
|
|
ndbout << "addColumnName on Index Error " << errno << endl;
|
|
}
|
|
int res = pDict->createIndex(ndb_index, tab);
|
|
if (res != 0)
|
|
{
|
|
ndbout << pDict->getNdbError() << endl;
|
|
return -1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
for(unsigned i=0; i < tNumTables ;i++)
|
|
{
|
|
const NdbDictionary::Table * pTab = pDict->getTable(tableName[i]);
|
|
if (pTab == NULL)
|
|
{
|
|
error_handler(pDict->getNdbError());
|
|
return -1;
|
|
}
|
|
tables[i] = pTab;
|
|
if (setUpNodeTableArray(i, pTab))
|
|
{
|
|
error_handler(pDict->getNdbError());
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
if (tNdbRecord)
|
|
{
|
|
for(unsigned i=0; i < tNumTables ;i++)
|
|
{
|
|
const NdbDictionary::Table * pTab = tables[i];
|
|
|
|
int off = 0;
|
|
Vector<NdbDictionary::RecordSpecification> spec;
|
|
for (Uint32 j = 0; j<unsigned(pTab->getNoOfColumns()); j++)
|
|
{
|
|
NdbDictionary::RecordSpecification r0;
|
|
r0.column = pTab->getColumn(j);
|
|
r0.offset = off;
|
|
off += (r0.column->getSizeInBytes() + 3) & ~(Uint32)3;
|
|
spec.push_back(r0);
|
|
}
|
|
g_record[i] =
|
|
pDict->createRecord(pTab, spec.getBase(),
|
|
spec.size(),
|
|
sizeof(NdbDictionary::RecordSpecification));
|
|
require(g_record[i]);
|
|
}
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
bool error_handler(const NdbError & err){
|
|
ndbout << err << endl ;
|
|
switch(err.classification){
|
|
case NdbError::TemporaryResourceError:
|
|
case NdbError::OverloadError:
|
|
case NdbError::SchemaError:
|
|
ndbout << endl << "Attempting to recover and continue now..." << endl ;
|
|
return true;
|
|
default:
|
|
break;
|
|
}
|
|
return false ; // return false to abort
|
|
}
|
|
|
|
static void
|
|
setAggregateRun(void)
|
|
{
|
|
tNoOfLoops = 1;
|
|
tExtraReadLoop = 0;
|
|
theTableCreateFlag = 1;
|
|
}
|
|
|
|
/* Start NEW Module */
|
|
|
|
/**
|
|
* This part contains the code used for the case --local 4 which is using
|
|
* the design pattern that could be used for asynchronous applications of
|
|
* the NDB API.
|
|
*
|
|
* This variant will always use transaction hints, it will always the
|
|
* NDB Record format in the NDB API.
|
|
*/
|
|
|
|
static void* definer_thread(void *data);
|
|
static void* executor_thread(void *data);
|
|
|
|
static Uint32 tNoOfExecutorThreads = 0;
|
|
static Uint32 tNoOfDefinerThreads = 0;
|
|
static Uint32 tNumThreadGroups = 0;
|
|
static Uint32 tNumThreadGroupsPerDefinerThread = 0;
|
|
static Uint32 tNumNodes = 0;
|
|
|
|
enum RunState
|
|
{
|
|
WARMUP = 0,
|
|
EXECUTING = 1,
|
|
COOLDOWN = 2
|
|
};
|
|
|
|
RunState tRunState = WARMUP;
|
|
|
|
static Uint64
|
|
get_total_transactions()
|
|
{
|
|
Uint64 total_transactions = 0;
|
|
|
|
for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
|
|
{
|
|
total_transactions += ThreadExecutions[i];
|
|
}
|
|
return total_transactions;
|
|
}
|
|
|
|
static Uint64
|
|
get_total_rounds()
|
|
{
|
|
Uint64 total_rounds = 0;
|
|
|
|
for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
|
|
{
|
|
total_rounds += ThreadExecutionRounds[i];
|
|
}
|
|
return total_rounds;
|
|
}
|
|
|
|
static void
|
|
init_list_headers(KEY_LIST_HEADER *list_header,
|
|
Uint32 num_list_headers)
|
|
{
|
|
Uint32 i;
|
|
KEY_LIST_HEADER *list_header_ref;
|
|
char *list_header_ptr = (char*)list_header;
|
|
for (i = 0;
|
|
i < num_list_headers;
|
|
i++, list_header_ptr += sizeof(KEY_LIST_HEADER))
|
|
{
|
|
list_header_ref = (KEY_LIST_HEADER*)list_header_ptr;
|
|
list_header_ref->first_in_list = NULL;
|
|
list_header_ref->last_in_list = NULL;
|
|
list_header_ref->num_in_list = 0;
|
|
}
|
|
}
|
|
|
|
static void
|
|
wait_thread_ready(THREAD_DATA *my_thread_data)
|
|
{
|
|
NdbMutex_Lock(my_thread_data->transport_mutex);
|
|
while (1)
|
|
{
|
|
if (my_thread_data->ready)
|
|
break;
|
|
NdbCondition_Wait(my_thread_data->main_cond,
|
|
my_thread_data->transport_mutex);
|
|
}
|
|
NdbMutex_Unlock(my_thread_data->transport_mutex);
|
|
}
|
|
|
|
static void
|
|
wait_for_threads_ready(Uint32 num_threads)
|
|
{
|
|
for (Uint32 i = 0; i < num_threads; i++)
|
|
{
|
|
wait_thread_ready(&thread_data_array[i]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
signal_thread_to_start(THREAD_DATA *my_thread_data)
|
|
{
|
|
NdbMutex_Lock(my_thread_data->transport_mutex);
|
|
my_thread_data->start = true;
|
|
my_thread_data->ready = false;
|
|
NdbCondition_Signal(my_thread_data->start_cond);
|
|
NdbMutex_Unlock(my_thread_data->transport_mutex);
|
|
}
|
|
|
|
static void
|
|
signal_definer_threads_to_start()
|
|
{
|
|
for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
|
|
{
|
|
signal_thread_to_start(&thread_data_array[i]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
signal_executor_threads_to_start()
|
|
{
|
|
for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
|
|
{
|
|
signal_thread_to_start(&thread_data_array[tNoOfDefinerThreads + i]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
signal_thread_ready_wait_for_start(THREAD_DATA *my_thread_data)
|
|
{
|
|
NdbMutex_Lock(my_thread_data->transport_mutex);
|
|
my_thread_data->ready = true;
|
|
NdbCondition_Signal(my_thread_data->main_cond);
|
|
while (1)
|
|
{
|
|
if (my_thread_data->start)
|
|
break;
|
|
NdbCondition_Wait(my_thread_data->start_cond,
|
|
my_thread_data->transport_mutex);
|
|
}
|
|
my_thread_data->start = false;
|
|
NdbMutex_Unlock(my_thread_data->transport_mutex);
|
|
}
|
|
|
|
static void
|
|
signal_thread_to_stop(THREAD_DATA *my_thread_data)
|
|
{
|
|
NdbMutex_Lock(my_thread_data->transport_mutex);
|
|
my_thread_data->stop = true;
|
|
NdbCondition_Signal(my_thread_data->transport_cond);
|
|
NdbMutex_Unlock(my_thread_data->transport_mutex);
|
|
}
|
|
|
|
static void
|
|
signal_definer_threads_to_stop()
|
|
{
|
|
for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
|
|
{
|
|
signal_thread_to_stop(&thread_data_array[i]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
signal_executor_threads_to_stop()
|
|
{
|
|
for (Uint32 i = tNoOfDefinerThreads; i < tNoOfThreads; i++)
|
|
{
|
|
signal_thread_to_stop(&thread_data_array[i]);
|
|
}
|
|
}
|
|
|
|
static void
|
|
destroy_thread_data(THREAD_DATA *my_thread_data)
|
|
{
|
|
free(my_thread_data->record);
|
|
NdbMutex_Destroy(my_thread_data->transport_mutex);
|
|
NdbCondition_Destroy(my_thread_data->transport_cond);
|
|
NdbCondition_Destroy(my_thread_data->start_cond);
|
|
NdbCondition_Destroy(my_thread_data->main_cond);
|
|
}
|
|
|
|
static void
|
|
init_thread_data(THREAD_DATA *my_thread_data, Uint32 thread_id)
|
|
{
|
|
Uint32 sz = NdbDictionary::getRecordRowLength(g_record[0]);
|
|
my_thread_data->record = (char*)malloc(sz);
|
|
memset(my_thread_data->record, 0, sz);
|
|
init_list_headers(&my_thread_data->list_header, 1);
|
|
my_thread_data->stop = false;
|
|
my_thread_data->ready = false;
|
|
my_thread_data->start = false;
|
|
my_thread_data->rand_seed = 1;
|
|
#ifdef _WIN32
|
|
srand(my_thread_data->rand_seed);
|
|
#endif
|
|
my_thread_data->transport_mutex = NdbMutex_Create();
|
|
my_thread_data->transport_cond = NdbCondition_Create();
|
|
my_thread_data->main_cond = NdbCondition_Create();
|
|
my_thread_data->start_cond = NdbCondition_Create();
|
|
my_thread_data->thread_id = thread_id;
|
|
}
|
|
|
|
static void
|
|
create_definer_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
|
|
{
|
|
init_thread_data(my_thread_data, thread_id);
|
|
threadLife[thread_id] = NdbThread_Create(definer_thread,
|
|
(void**)my_thread_data,
|
|
1024 * 1024,
|
|
"flexAsynchThread",
|
|
NDB_THREAD_PRIO_LOW);
|
|
}
|
|
|
|
static void
|
|
create_definer_threads()
|
|
{
|
|
Uint32 cpu_id = 0;
|
|
for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
|
|
{
|
|
Uint32 thread_id = i;
|
|
create_definer_thread(&thread_data_array[thread_id], thread_id);
|
|
if (numberOfDefinerCPU != 0)
|
|
{
|
|
Ndb_LockCPU(threadLife[thread_id], Uint32(definerCPUArray[cpu_id]));
|
|
cpu_id++;
|
|
if (cpu_id == numberOfDefinerCPU)
|
|
{
|
|
cpu_id = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static void
|
|
create_executor_thread(THREAD_DATA *my_thread_data, Uint32 thread_id)
|
|
{
|
|
init_thread_data(my_thread_data, thread_id);
|
|
threadLife[thread_id] = NdbThread_Create(executor_thread,
|
|
(void**)my_thread_data,
|
|
1024 * 1024,
|
|
"flexAsynchThread",
|
|
NDB_THREAD_PRIO_LOW);
|
|
}
|
|
|
|
static void
|
|
create_executor_threads()
|
|
{
|
|
Uint32 cpu_id = 0;
|
|
for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
|
|
{
|
|
Uint32 thread_id = tNoOfDefinerThreads + i;
|
|
create_executor_thread(&thread_data_array[thread_id], thread_id);
|
|
if (numberOfExecutorCPU != 0)
|
|
{
|
|
Ndb_LockCPU(threadLife[thread_id], Uint32(executorCPUArray[cpu_id]));
|
|
cpu_id++;
|
|
if (cpu_id == numberOfExecutorCPU)
|
|
{
|
|
cpu_id = 0;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
static bool
|
|
main_thread(RunType start_type, NdbTimer & timer)
|
|
{
|
|
bool insert_delete;
|
|
void * tmp;
|
|
|
|
tNoOfExecutorThreads = tNoOfThreads;
|
|
tNumNodes = get_node_count((Uint32)0);
|
|
|
|
if ((tNoOfExecutorThreads % tNumNodes) != 0)
|
|
{
|
|
printf("Number of executor threads (-t, now set to %u) must be multiple"
|
|
"of number nodes (number of nodes = %u)\n",
|
|
tNoOfExecutorThreads,
|
|
tNumNodes);
|
|
return false;
|
|
}
|
|
tNumThreadGroups = tNoOfExecutorThreads / tNumNodes;
|
|
printf("Number of nodes are %u, number of thread groups are %u\n",
|
|
tNumNodes,
|
|
tNumThreadGroups);
|
|
|
|
if (tNoOfDefinerThreads == 0)
|
|
{
|
|
tNoOfDefinerThreads = tNumThreadGroups;
|
|
}
|
|
if (tNumThreadGroups >= tNoOfDefinerThreads)
|
|
{
|
|
if ((tNumThreadGroups % tNoOfDefinerThreads) != 0)
|
|
{
|
|
printf("Number of definer threads (now %u), must be multiple of"
|
|
" number of thread groups (now %u)\n",
|
|
tNoOfDefinerThreads,
|
|
tNumThreadGroups);
|
|
return false;
|
|
}
|
|
tNumThreadGroupsPerDefinerThread = tNumThreadGroups / tNoOfDefinerThreads;
|
|
}
|
|
else
|
|
{
|
|
printf("The number of thread groups (now %u) must be a multiple of the"
|
|
" number of definer threads (now %u)\n",
|
|
tNumThreadGroups,
|
|
tNoOfDefinerThreads);
|
|
return false;
|
|
}
|
|
tNoOfThreads = tNoOfExecutorThreads + tNoOfDefinerThreads;
|
|
|
|
if (start_type == RunInsert ||
|
|
start_type == RunDelete)
|
|
{
|
|
insert_delete = true;
|
|
}
|
|
else
|
|
{
|
|
insert_delete = false;
|
|
}
|
|
|
|
create_definer_threads();
|
|
create_executor_threads();
|
|
|
|
wait_for_threads_ready(tNoOfThreads);
|
|
|
|
/**
|
|
* Start threads, start with execution threads to ensure they are
|
|
* up and running before definer threads starts sending data to
|
|
* them
|
|
*/
|
|
START_TIMER;
|
|
signal_definer_threads_to_start();
|
|
signal_executor_threads_to_start();
|
|
|
|
if (!insert_delete)
|
|
{
|
|
sleep(tWarmupTime);
|
|
tRunState = EXECUTING;
|
|
sleep(tExecutionTime);
|
|
tRunState = COOLDOWN;
|
|
sleep(tCooldownTime);
|
|
signal_definer_threads_to_stop();
|
|
}
|
|
wait_for_threads_ready(tNoOfDefinerThreads);
|
|
STOP_TIMER;
|
|
|
|
signal_executor_threads_to_stop();
|
|
wait_for_threads_ready(tNoOfThreads);
|
|
|
|
/**
|
|
* Now all threads are stopped and prepared to be destroyed,
|
|
* now start them just to destroy themselves
|
|
*/
|
|
signal_definer_threads_to_start();
|
|
signal_executor_threads_to_start();
|
|
|
|
for (Uint32 i = 0; i < tNoOfThreads; i++)
|
|
{
|
|
NdbThread_WaitFor(threadLife[i], &tmp);
|
|
NdbThread_Destroy(&threadLife[i]);
|
|
}
|
|
return true;
|
|
}
|
|
|
|
static NdbConnection*
|
|
get_trans_object(Uint32 first_key,
|
|
Uint32 second_key,
|
|
Ndb *my_ndb)
|
|
{
|
|
union {
|
|
Uint64 _align;
|
|
Uint32 Tkey32[2];
|
|
};
|
|
(void)_align;
|
|
|
|
Tkey32[0] = first_key;
|
|
Tkey32[1] = second_key;
|
|
Ndb::Key_part_ptr hint[2];
|
|
hint[0].ptr = Tkey32+0;
|
|
hint[0].len = 4;
|
|
hint[1].ptr = 0;
|
|
hint[1].len = 0;
|
|
|
|
return my_ndb->startTransaction(tables[0], hint);
|
|
}
|
|
|
|
static Ndb*
|
|
get_ndb_object(Uint32 my_thread_id)
|
|
{
|
|
Uint32 node_rel_id = my_thread_id % tNumNodes;
|
|
Ndb *my_ndb = new Ndb(g_cluster_connection[node_rel_id % tConnections],
|
|
"TEST_DB");
|
|
my_ndb->init(MAXPAR);
|
|
my_ndb->waitUntilReady(10000);
|
|
return my_ndb;
|
|
}
|
|
|
|
static void
|
|
insert_list(KEY_LIST_HEADER *list_header,
|
|
KEY_OPERATION *insert_op)
|
|
{
|
|
KEY_OPERATION *current_last = list_header->last_in_list;
|
|
insert_op->next_key_op = NULL;
|
|
list_header->last_in_list = insert_op;
|
|
if (current_last)
|
|
{
|
|
current_last->next_key_op = insert_op;
|
|
}
|
|
else
|
|
{
|
|
list_header->first_in_list = insert_op;
|
|
}
|
|
list_header->num_in_list++;
|
|
}
|
|
|
|
static KEY_OPERATION*
|
|
get_first_free(KEY_LIST_HEADER *list_header)
|
|
{
|
|
require(list_header->first_in_list);
|
|
KEY_OPERATION *key_op = list_header->first_in_list;
|
|
list_header->first_in_list = key_op->next_key_op;
|
|
list_header->num_in_list--;
|
|
if (!list_header->first_in_list)
|
|
{
|
|
list_header->last_in_list = NULL;
|
|
}
|
|
key_op->next_key_op = NULL;
|
|
return key_op;
|
|
}
|
|
|
|
static void
|
|
move_list(KEY_LIST_HEADER *src_list_header,
|
|
KEY_LIST_HEADER *dst_list_header)
|
|
{
|
|
KEY_OPERATION *last_completed_op = dst_list_header->last_in_list;
|
|
KEY_OPERATION *first_in_list = src_list_header->first_in_list;
|
|
if (!first_in_list)
|
|
return;
|
|
if (last_completed_op)
|
|
{
|
|
last_completed_op->next_key_op = first_in_list;
|
|
}
|
|
else
|
|
{
|
|
dst_list_header->first_in_list = first_in_list;
|
|
}
|
|
dst_list_header->last_in_list = src_list_header->last_in_list;
|
|
dst_list_header->num_in_list += src_list_header->num_in_list;
|
|
src_list_header->num_in_list = 0;
|
|
src_list_header->first_in_list = NULL;
|
|
src_list_header->last_in_list = NULL;
|
|
}
|
|
|
|
/**
|
|
* Retrieve a linked list of prepared operations. If no operations
|
|
* prepared we wait on a condition until operations are defined for
|
|
* us to execute.
|
|
*/
|
|
static void
|
|
receive_operations(THREAD_DATA *my_thread_data,
|
|
KEY_LIST_HEADER *list_header,
|
|
bool wait)
|
|
{
|
|
bool first = true;
|
|
KEY_LIST_HEADER *thread_list_header = &my_thread_data->list_header;
|
|
list_header->first_in_list = NULL;
|
|
list_header->last_in_list = NULL;
|
|
list_header->num_in_list = 0;
|
|
|
|
recheck:
|
|
NdbMutex_Lock(my_thread_data->transport_mutex);
|
|
while (!my_thread_data->stop &&
|
|
(first || thread_list_header->first_in_list))
|
|
{
|
|
move_list(thread_list_header, list_header);
|
|
if (list_header->first_in_list)
|
|
break;
|
|
NdbCondition_Wait(my_thread_data->transport_cond,
|
|
my_thread_data->transport_mutex);
|
|
}
|
|
NdbMutex_Unlock(my_thread_data->transport_mutex);
|
|
if (first && wait &&
|
|
list_header->num_in_list < ((tNoOfParallelTrans + 1) / 2))
|
|
{
|
|
/**
|
|
* We will wait for at least 1 milliseconds extra if we haven't yet
|
|
* received at least half of the number of records we desire to execute.
|
|
*/
|
|
NdbSleep_MicroSleep(1000);
|
|
first = false;
|
|
goto recheck;
|
|
}
|
|
}
|
|
|
|
static void
|
|
send_operations(Uint32 thread_id,
|
|
KEY_LIST_HEADER *list_header)
|
|
{
|
|
THREAD_DATA *recv_thread = &thread_data_array[thread_id];
|
|
|
|
NdbMutex_Lock(recv_thread->transport_mutex);
|
|
/**
|
|
* We are moving operations into the list, thus we need
|
|
* to wake any threads waiting for operations to execute.
|
|
*/
|
|
move_list(list_header,
|
|
&recv_thread->list_header);
|
|
NdbCondition_Signal(recv_thread->transport_cond);
|
|
NdbMutex_Unlock(recv_thread->transport_mutex);
|
|
}
|
|
|
|
static void
|
|
init_key_op_list(char *key_op_ptr,
|
|
KEY_LIST_HEADER *list_header,
|
|
Uint32 max_outstanding,
|
|
Uint32 my_thread_id,
|
|
RunType my_run_type)
|
|
{
|
|
KEY_OPERATION *key_op = NULL;
|
|
assert(max_outstanding > 0);
|
|
|
|
list_header->first_in_list = (KEY_OPERATION*)key_op_ptr;
|
|
for (Uint32 i = 0;
|
|
i < max_outstanding;
|
|
i++, key_op_ptr += sizeof(KEY_OPERATION))
|
|
{
|
|
key_op = (KEY_OPERATION*)key_op_ptr;
|
|
key_op->next_key_op = (KEY_OPERATION*)(key_op_ptr + sizeof(KEY_OPERATION));
|
|
key_op->definer_thread_id = my_thread_id;
|
|
key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
|
|
key_op->operation_type = my_run_type;
|
|
key_op->table_id = 0;
|
|
}
|
|
key_op->next_key_op = NULL; /* Last key operation */
|
|
list_header->last_in_list = key_op;
|
|
list_header->num_in_list = max_outstanding;
|
|
}
|
|
|
|
static Uint32
|
|
get_thread_id_for_record(Uint32 record_id,
|
|
Uint32 start_thread_id,
|
|
Ndb *my_ndb)
|
|
{
|
|
NdbConnection *trans = get_trans_object(record_id, record_id, my_ndb);
|
|
Uint32 node_id = trans->getConnectedNodeId();
|
|
trans->close();
|
|
Uint32 node_rel_id = get_node_relative_id((Uint32)0, node_id);
|
|
return (start_thread_id + node_rel_id);
|
|
}
|
|
|
|
void init_thread_id_mem(Uint32 *thread_id_mem,
|
|
Uint32 first_thread_id,
|
|
Uint32 first_record,
|
|
Uint32 total_records,
|
|
Ndb *my_ndb)
|
|
{
|
|
Uint32 thread_group = 0;
|
|
Uint32 start_thread_id = first_thread_id;
|
|
for (Uint32 record_id = first_record, i = 0;
|
|
i < total_records;
|
|
i++, record_id++)
|
|
{
|
|
thread_id_mem[i] = get_thread_id_for_record(record_id,
|
|
start_thread_id,
|
|
my_ndb);
|
|
thread_group++;
|
|
start_thread_id += tNumNodes;
|
|
if (thread_group == tNumThreadGroupsPerDefinerThread)
|
|
{
|
|
thread_group = 0;
|
|
start_thread_id = first_thread_id;
|
|
}
|
|
}
|
|
}
|
|
|
|
static bool
|
|
check_for_outstanding(Uint32 *thread_state)
|
|
{
|
|
for (Uint32 i = 0; i < tNoOfExecutorThreads; i++)
|
|
{
|
|
if (thread_state[i])
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
static void
|
|
update_thread_state(KEY_LIST_HEADER *list_header,
|
|
Uint32 *thread_state)
|
|
{
|
|
KEY_OPERATION *key_op = list_header->first_in_list;
|
|
|
|
while (key_op)
|
|
{
|
|
thread_state[key_op->executor_thread_id]--;
|
|
key_op->executor_thread_id = MAX_EXECUTOR_THREADS;
|
|
key_op = key_op->next_key_op;
|
|
}
|
|
}
|
|
|
|
static void
|
|
wait_until_all_completed(THREAD_DATA *my_thread_data,
|
|
Uint32 *thread_state,
|
|
KEY_LIST_HEADER *free_list_header)
|
|
{
|
|
KEY_LIST_HEADER list_header;
|
|
bool outstanding = true;
|
|
while (outstanding && !my_thread_data->stop)
|
|
{
|
|
receive_operations(my_thread_data, &list_header, false);
|
|
update_thread_state(&list_header, thread_state);
|
|
move_list(&list_header, free_list_header);
|
|
outstanding = check_for_outstanding(thread_state);
|
|
}
|
|
}
|
|
|
|
static Uint32
|
|
prepare_operations(Uint32 *thread_id_mem,
|
|
KEY_LIST_HEADER *free_list_header,
|
|
Uint32 *thread_state,
|
|
Uint32 first_record_to_define,
|
|
Uint32 num_records_to_define,
|
|
Uint32 first_record,
|
|
Uint32 last_record,
|
|
Uint32 max_per_thread)
|
|
{
|
|
KEY_LIST_HEADER thread_list_headers[MAX_EXECUTOR_THREADS];
|
|
Uint32 record_id, i, num_records;
|
|
|
|
init_list_headers(&thread_list_headers[0], tNoOfExecutorThreads);
|
|
for (record_id = first_record_to_define, i = 0;
|
|
record_id <= last_record && i < num_records_to_define;
|
|
record_id++, i++)
|
|
{
|
|
KEY_OPERATION *define_op = get_first_free(free_list_header);
|
|
Uint32 thread_id = thread_id_mem[record_id - first_record];
|
|
define_op->first_key = record_id;
|
|
define_op->second_key = record_id;
|
|
define_op->table_id = record_id % tNumTables;
|
|
define_op->executor_thread_id = thread_id;
|
|
thread_state[thread_id]++;
|
|
KEY_LIST_HEADER *thread_list_header = &thread_list_headers[thread_id];
|
|
insert_list(thread_list_header, define_op);
|
|
if (thread_list_header->num_in_list >= max_per_thread)
|
|
{
|
|
/**
|
|
* One thread has max number of records, we won't define any
|
|
* more to keep the code simple.
|
|
*/
|
|
i++;
|
|
break;
|
|
}
|
|
}
|
|
num_records = i;
|
|
for (i = 0; i < tNoOfExecutorThreads; i++)
|
|
{
|
|
KEY_LIST_HEADER *thread_list_header = &thread_list_headers[i];
|
|
if (thread_list_header->num_in_list)
|
|
{
|
|
send_operations(tNoOfDefinerThreads + i, thread_list_header);
|
|
}
|
|
}
|
|
return num_records;
|
|
}
|
|
|
|
/**
|
|
* Each definer thread works with one or more thread groups of executor
|
|
* threads. Each executor thread handles transactions towards one node
|
|
* in the cluster, one thread group is a set of threads handling all nodes
|
|
* in the cluster. One definer thread can handle one or more such thread
|
|
* groups. This means that we will only send operations to those threads
|
|
* and to no other threads. So the different definer threads will work
|
|
* quite independent of each other.
|
|
*
|
|
* Each executor thread will receive input from one definer thread. So
|
|
* there is a 1-n mapping between definer threads and executor threads.
|
|
*/
|
|
static void*
|
|
definer_thread(void *data)
|
|
{
|
|
THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
|
|
Uint32 my_thread_id = my_thread_data->thread_id;
|
|
RunType run_type = tRunType;
|
|
Uint32 thread_state[MAX_EXECUTOR_THREADS];
|
|
Uint32 max_outstanding = (tNoOfExecutorThreads * tNoOfParallelTrans) /
|
|
tNoOfDefinerThreads;
|
|
Uint32 max_per_thread = 1000;
|
|
Uint32 first_thread_id = my_thread_id *
|
|
(tNumNodes * tNumThreadGroupsPerDefinerThread);
|
|
Uint32 total_records = max_outstanding * tNoOfTransactions;
|
|
Uint32 first_record = total_records * my_thread_id;
|
|
Uint32 my_last_record = first_record + total_records - 1;
|
|
Uint32 current_record = first_record;
|
|
KEY_LIST_HEADER free_list_header;
|
|
void *key_op_mem = malloc(sizeof(KEY_OPERATION) * max_outstanding);
|
|
Uint32 *thread_id_mem = (Uint32*)malloc(total_records*sizeof(Uint32));
|
|
memset((char*)&thread_state[0], 0, sizeof(thread_state));
|
|
|
|
init_key_op_list((char*)key_op_mem,
|
|
&free_list_header,
|
|
max_outstanding,
|
|
my_thread_id,
|
|
run_type);
|
|
Ndb *my_ndb = get_ndb_object(my_thread_id);
|
|
init_thread_id_mem(thread_id_mem,
|
|
first_thread_id,
|
|
first_record,
|
|
total_records,
|
|
my_ndb);
|
|
delete my_ndb;
|
|
ThreadExecutions[my_thread_id] = 0;
|
|
ThreadExecutionRounds[my_thread_id] = 0;
|
|
signal_thread_ready_wait_for_start(my_thread_data);
|
|
while (!my_thread_data->stop)
|
|
{
|
|
Uint32 defined_ops = prepare_operations(thread_id_mem,
|
|
&free_list_header,
|
|
&thread_state[0],
|
|
current_record,
|
|
max_outstanding,
|
|
first_record,
|
|
my_last_record,
|
|
max_per_thread);
|
|
current_record += defined_ops;
|
|
if (defined_ops)
|
|
{
|
|
wait_until_all_completed(my_thread_data,
|
|
&thread_state[0],
|
|
&free_list_header);
|
|
}
|
|
if (current_record > my_last_record)
|
|
{
|
|
if (run_type != RunRead &&
|
|
run_type != RunUpdate)
|
|
{
|
|
/**
|
|
* Inserts and deletes are done when first round is
|
|
* completed. Reads and updates proceed until time is
|
|
* completed.
|
|
*/
|
|
break;
|
|
}
|
|
current_record = first_record;
|
|
}
|
|
}
|
|
signal_thread_ready_wait_for_start(my_thread_data);
|
|
free(key_op_mem);
|
|
free(thread_id_mem);
|
|
destroy_thread_data(my_thread_data);
|
|
return NULL;
|
|
}
|
|
|
|
/**
|
|
* This method receives a linked list of key operations and executes
|
|
* all of them.
|
|
*
|
|
* Return Value: >= 0 means successful completion of this many operations
|
|
* -1 Failure, stop test
|
|
*/
|
|
static int
|
|
execute_operations(THREAD_DATA *my_thread_data,
|
|
char *record,
|
|
Ndb* my_ndb,
|
|
KEY_OPERATION *key_op)
|
|
{
|
|
NdbConnection* ndb_conn_array[MAXPAR];
|
|
Uint32 num_ops = 0;
|
|
|
|
while (key_op)
|
|
{
|
|
ndb_conn_array[num_ops] = get_trans_object(key_op->first_key,
|
|
key_op->second_key,
|
|
my_ndb);
|
|
if (ndb_conn_array[num_ops] == NULL)
|
|
{
|
|
error_handler(my_ndb->getNdbError());
|
|
ndbout << endl << "Unable to recover in " << num_ops;
|
|
ndbout << " op! Quitting now" << endl ;
|
|
return -1;
|
|
}
|
|
//-------------------------------------------------------
|
|
// Define the operation, but do not execute it yet.
|
|
//-------------------------------------------------------
|
|
defineNdbRecordOperation(record,
|
|
key_op->table_id,
|
|
ndb_conn_array[num_ops],
|
|
(StartType)key_op->operation_type,
|
|
key_op->first_key,
|
|
key_op->second_key,
|
|
my_thread_data);
|
|
|
|
ndb_conn_array[num_ops]->executeAsynchPrepare(Commit,
|
|
&executeCallback,
|
|
(void*)&ndb_conn_array[num_ops]);
|
|
num_ops++;
|
|
key_op = key_op->next_key_op;
|
|
}
|
|
if (num_ops == 0)
|
|
return 0;
|
|
|
|
/**
|
|
* Now execute each defined operation and wait for all of them to
|
|
* complete.
|
|
*/
|
|
int Tcomp = my_ndb->sendPollNdb(3000,
|
|
num_ops,
|
|
tSendForce);
|
|
if (Tcomp != (int)num_ops &&
|
|
my_ndb->getNdbError().code != 0)
|
|
{
|
|
/* Error handling */
|
|
if (error_count > 100)
|
|
return -1;
|
|
|
|
error_count++;
|
|
ndbout << "error = " << my_ndb->getNdbError().code << endl;
|
|
}
|
|
return Tcomp;
|
|
}
|
|
|
|
static void
|
|
report_back_operations(KEY_OPERATION *first_defined_op)
|
|
{
|
|
KEY_LIST_HEADER thread_list_header[MAX_DEFINER_THREADS];
|
|
KEY_OPERATION *next_op, *executed_op;
|
|
|
|
init_list_headers(&thread_list_header[0], tNoOfDefinerThreads);
|
|
executed_op = first_defined_op;
|
|
while (executed_op)
|
|
{
|
|
next_op = executed_op->next_key_op;
|
|
insert_list(&thread_list_header[executed_op->definer_thread_id],
|
|
executed_op);
|
|
executed_op = next_op;
|
|
}
|
|
for (Uint32 i = 0; i < tNoOfDefinerThreads; i++)
|
|
{
|
|
if (thread_list_header[i].first_in_list)
|
|
{
|
|
send_operations(i, &thread_list_header[i]);
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* This is the main function of the executor threads, these threads
|
|
* receive linked lists of operations to execute from the definer
|
|
* threads. The definer threads stops these threads by simply
|
|
* sending a stop operation.
|
|
*/
|
|
static void*
|
|
executor_thread(void *data)
|
|
{
|
|
THREAD_DATA *my_thread_data = (THREAD_DATA*)data;
|
|
Uint32 my_thread_id = my_thread_data->thread_id;
|
|
Uint64 exec_count = 0;
|
|
Uint64 executions = 0;
|
|
Uint32 error_flag = false;
|
|
int ret_code;
|
|
KEY_LIST_HEADER list_header;
|
|
|
|
Ndb *my_ndb = get_ndb_object(my_thread_id);
|
|
ThreadExecutions[my_thread_id] = 0;
|
|
ThreadExecutionRounds[my_thread_id] = 0;
|
|
|
|
signal_thread_ready_wait_for_start(my_thread_data);
|
|
|
|
while (!my_thread_data->stop)
|
|
{
|
|
receive_operations(my_thread_data, &list_header, !tImmediate);
|
|
if (list_header.num_in_list == 0)
|
|
{
|
|
break;
|
|
}
|
|
ret_code = 0;
|
|
if (!error_flag)
|
|
{
|
|
/* Ignore to execute after errors to simplify error handling */
|
|
ret_code = execute_operations(my_thread_data,
|
|
my_thread_data->record,
|
|
my_ndb,
|
|
list_header.first_in_list);
|
|
}
|
|
report_back_operations(list_header.first_in_list);
|
|
if (ret_code < 0)
|
|
{
|
|
ndbout_c("executor thread id = %u received error after %u executions",
|
|
my_thread_id,
|
|
(Uint32)executions);
|
|
error_flag = true;
|
|
}
|
|
else if (!error_flag &&
|
|
(tRunType == RunInsert ||
|
|
tRunType == RunDelete ||
|
|
tRunState == EXECUTING))
|
|
{
|
|
executions++;
|
|
exec_count += (Uint64)ret_code;
|
|
}
|
|
}
|
|
|
|
ThreadExecutions[my_thread_id] = exec_count;
|
|
ThreadExecutionRounds[my_thread_id] = executions;
|
|
signal_thread_ready_wait_for_start(my_thread_data);
|
|
delete my_ndb;
|
|
destroy_thread_data(my_thread_data);
|
|
return NULL;
|
|
}
|
|
|
|
/* End NEW Module */
|
|
|
|
static int
|
|
add_receive_cpus(Uint16 first,
|
|
Uint16 last,
|
|
Uint32 *num_cpus,
|
|
Uint16 *cpu_array)
|
|
{
|
|
Uint32 num_added_cpus;
|
|
Uint32 i;
|
|
|
|
if (last < first)
|
|
goto number_error;
|
|
num_added_cpus = last - first + 1;
|
|
|
|
if ((*num_cpus) + num_added_cpus > NDB_MAX_RECEIVE_CPUS)
|
|
goto too_many_error;
|
|
|
|
for (i = 0 ; i < num_added_cpus; i++)
|
|
{
|
|
cpu_array[numberOfReceiveCPU + i] = Uint16(first + i);
|
|
}
|
|
(*num_cpus) += num_added_cpus;
|
|
return 0;
|
|
|
|
number_error:
|
|
ndbout_c("Range error in -receive_cpus, first-last with bigger first");
|
|
return 1;
|
|
|
|
too_many_error:
|
|
ndbout_c("More than %u CPUs specified", Uint32(NDB_MAX_RECEIVE_CPUS));
|
|
return 1;
|
|
}
|
|
|
|
/**
|
|
* Very simple parser to allow for specifying the receive thread CPU to be used
|
|
* by NDB cluster connections. One CPU per NDB cluster connection should always
|
|
* be specified, no more, no less.
|
|
*/
|
|
static int
|
|
read_cpus(const char *str, Uint32 *num_cpus, Uint16 *cpu_array)
|
|
{
|
|
Uint32 i;
|
|
bool range_found = false;
|
|
bool number_found = false;
|
|
Uint16 start = 0;
|
|
Uint32 current_number = 0;
|
|
Uint32 len = (Uint32)strlen(str);
|
|
|
|
for (i = 0; i < len + 1; i++)
|
|
{
|
|
char c;
|
|
if (i == len)
|
|
c = ',';
|
|
else
|
|
c = str[i];
|
|
|
|
if (c == ',')
|
|
{
|
|
Uint16 end;
|
|
if (!number_found)
|
|
goto parse_error;
|
|
end = Uint16(current_number);
|
|
if (range_found)
|
|
{
|
|
if (add_receive_cpus(start, end, num_cpus, cpu_array) != 0)
|
|
return 1;
|
|
}
|
|
else
|
|
{
|
|
if (add_receive_cpus(end, end, num_cpus, cpu_array) != 0)
|
|
return 1;
|
|
}
|
|
range_found = false;
|
|
number_found = false;
|
|
current_number = 0;
|
|
}
|
|
else if (c == '-')
|
|
{
|
|
if (!number_found)
|
|
goto parse_error;
|
|
ndbout_c("- found");
|
|
range_found = true;
|
|
number_found = false;
|
|
start = Uint16(current_number);
|
|
current_number = 0;
|
|
}
|
|
else if (c >= '0' && c <= '9')
|
|
{
|
|
Uint32 number = c - '0';
|
|
current_number *= 10;
|
|
current_number += number;
|
|
if (current_number > 65536)
|
|
goto too_large_number_error;
|
|
number_found = true;
|
|
}
|
|
else
|
|
goto parse_error;
|
|
}
|
|
return 0;
|
|
|
|
parse_error:
|
|
ndbout_c("Invalid specification in receive_cpus");
|
|
return 1;
|
|
|
|
too_large_number_error:
|
|
ndbout_c("Number larger than 65536 not allowed for CPU id");
|
|
return 1;
|
|
}
|
|
|
|
static
|
|
int
|
|
readArguments(int argc, char** argv){
|
|
|
|
int i = 1;
|
|
while (argc > 1){
|
|
if (strcmp(argv[i], "-t") == 0){
|
|
tNoOfThreads = atoi(argv[i+1]);
|
|
if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)){
|
|
ndbout_c("Invalid no of threads");
|
|
return -1;
|
|
}
|
|
}
|
|
else if (strcmp(argv[i], "-d") == 0)
|
|
{
|
|
tNoOfDefinerThreads = atoi(argv[i+1]);
|
|
if (tNoOfDefinerThreads > NDB_MAXTHREADS)
|
|
{
|
|
ndbout_c("Invalid no of definer threads");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-p") == 0){
|
|
tNoOfParallelTrans = atoi(argv[i+1]);
|
|
if ((tNoOfParallelTrans < 1) || (tNoOfParallelTrans > MAXPAR)){
|
|
ndbout_c("Invalid no of parallel transactions");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-num_tables") == 0) {
|
|
tNumTables = atoi(argv[i+1]);
|
|
if ((tNumTables < 1) || (tNumTables > MAXTABLES))
|
|
{
|
|
ndbout_c("Invalid number of tables");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-num_indexes") == 0) {
|
|
tNumIndexes = atoi(argv[i+1]);
|
|
if ((tNumIndexes > MAX_INDEXES) || ((tNumIndexes + 1) >= tNoOfAttributes))
|
|
{
|
|
ndbout_c("Invalid number of indexes per table");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-load_factor") == 0){
|
|
tLoadFactor = atoi(argv[i+1]);
|
|
if ((tLoadFactor < 40) || (tLoadFactor > 99)){
|
|
ndbout_c("Invalid load factor");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-c") == 0) {
|
|
tNoOfOpsPerTrans = atoi(argv[i+1]);
|
|
if (tNoOfOpsPerTrans < 1){
|
|
ndbout_c("Invalid no of operations per transaction");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-o") == 0) {
|
|
tNoOfTransactions = atoi(argv[i+1]);
|
|
if (tNoOfTransactions < 1){
|
|
ndbout_c("Invalid no of transactions");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-a") == 0){
|
|
tNoOfAttributes = atoi(argv[i+1]);
|
|
if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)){
|
|
ndbout_c("Invalid no of attributes");
|
|
return -1;
|
|
}
|
|
if ((tNumIndexes + 1) >= tNoOfAttributes)
|
|
{
|
|
ndbout_c("Invalid number of indexes per table");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-n") == 0){
|
|
theStdTableNameFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-l") == 0){
|
|
tNoOfLoops = atoi(argv[i+1]);
|
|
if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)){
|
|
ndbout_c("Invalid no of loops");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-s") == 0){
|
|
tAttributeSize = atoi(argv[i+1]);
|
|
if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)){
|
|
ndbout_c("Invalid attributes size");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-local") == 0){
|
|
tLocal = atoi(argv[i+1]);
|
|
if (tLocal < 1 || (tLocal > 3)){
|
|
ndbout_c("Invalid local value, only 1,2 or 3 allowed");
|
|
return -1;
|
|
}
|
|
startTransGuess = true;
|
|
} else if (strcmp(argv[i], "-simple") == 0){
|
|
theSimpleFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-adaptive") == 0){
|
|
tSendForce = 0;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-force") == 0){
|
|
tSendForce = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-non_adaptive") == 0){
|
|
tSendForce = 2;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-write") == 0){
|
|
theWriteFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-dirty") == 0){
|
|
theDirtyFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-test") == 0){
|
|
theTestFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-no_table_create") == 0){
|
|
theTableCreateFlag = 1;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-temp") == 0){
|
|
tempTable = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-no_hint") == 0){
|
|
startTransGuess = false;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-ndbrecord") == 0){
|
|
tNdbRecord = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-r") == 0){
|
|
tExtraReadLoop = atoi(argv[i+1]);
|
|
} else if (strcmp(argv[i], "-con") == 0)
|
|
{
|
|
tConnections = atoi(argv[i+1]);
|
|
if (tConnections > 64)
|
|
{
|
|
ndbout_c("Max 64 NDB cluster connections can be used");
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-insert") == 0){
|
|
setAggregateRun();
|
|
tRunType = RunInsert;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-read") == 0){
|
|
setAggregateRun();
|
|
tRunType = RunRead;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-update") == 0){
|
|
setAggregateRun();
|
|
tRunType = RunUpdate;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-delete") == 0){
|
|
setAggregateRun();
|
|
tRunType = RunDelete;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-create_table") == 0){
|
|
tRunType = RunCreateTable;
|
|
argc++;
|
|
i--;
|
|
}
|
|
else if (strcmp(argv[i], "-new") == 0)
|
|
{
|
|
tNew = true;
|
|
tNdbRecord = true;
|
|
argc++;
|
|
i--;
|
|
}
|
|
else if (strcmp(argv[i], "-immediate") == 0)
|
|
{
|
|
tImmediate = true;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-drop_table") == 0){
|
|
tRunType = RunDropTable;
|
|
argc++;
|
|
i--;
|
|
} else if (strcmp(argv[i], "-warmup_time") == 0){
|
|
tWarmupTime = atoi(argv[i+1]);
|
|
} else if (strcmp(argv[i], "-execution_time") == 0){
|
|
tExecutionTime = atoi(argv[i+1]);
|
|
} else if (strcmp(argv[i], "-cooldown_time") == 0){
|
|
tCooldownTime = atoi(argv[i+1]);
|
|
} else if (strcmp(argv[i], "-table") == 0){
|
|
tStdTableNum = atoi(argv[i+1]);
|
|
theStdTableNameFlag = 1;
|
|
} else if (strcmp(argv[i], "-receive_cpus") == 0) {
|
|
if (read_cpus(argv[i+1],
|
|
&numberOfReceiveCPU,
|
|
&receiveCPUArray[0]) != 0)
|
|
{
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-definer_cpus") == 0) {
|
|
if (read_cpus(argv[i+1],
|
|
&numberOfDefinerCPU,
|
|
&definerCPUArray[0]) != 0)
|
|
{
|
|
return -1;
|
|
}
|
|
} else if (strcmp(argv[i], "-executor_cpus") == 0) {
|
|
if (read_cpus(argv[i+1],
|
|
&numberOfExecutorCPU,
|
|
&executorCPUArray[0]) != 0)
|
|
{
|
|
return -1;
|
|
}
|
|
} else {
|
|
ndbout_c("No such parameter: %s", argv[i]);
|
|
return -1;
|
|
}
|
|
|
|
argc -= 2;
|
|
i = i + 2;
|
|
}//while
|
|
if (tLocal > 0) {
|
|
if (tNoOfOpsPerTrans != 1) {
|
|
ndbout_c("Not valid to have more than one op per trans with local");
|
|
}//if
|
|
if (startTransGuess == false) {
|
|
ndbout_c("Not valid to use no_hint with local");
|
|
}//if
|
|
}//if
|
|
if (numberOfReceiveCPU != 0 &&
|
|
int(numberOfReceiveCPU) != tConnections)
|
|
{
|
|
ndbout_c("One CPU per NDB connection is used, inconsistency in -receive_cpus");
|
|
return -1;
|
|
}
|
|
return 0;
|
|
}
|
|
|
|
static
|
|
void
|
|
input_error(){
|
|
ndbout_c("FLEXASYNCH");
|
|
ndbout_c(" Perform benchmark of insert, update and delete transactions");
|
|
ndbout_c(" ");
|
|
ndbout_c("Arguments:");
|
|
ndbout_c(" -t Number of threads to start, default 1");
|
|
ndbout_c(" -p Number of parallel transactions per thread, default 32");
|
|
ndbout_c(" -o Number of transactions per loop, default 500");
|
|
ndbout_c(" -l Number of loops to run, default 1, 0=infinite");
|
|
ndbout_c(" -load_factor Number Load factor in index in percent (40 -> 99)");
|
|
ndbout_c(" -a Number of attributes, default 25");
|
|
ndbout_c(" -c Number of operations per transaction");
|
|
ndbout_c(" -s Size of each attribute, default 1 ");
|
|
ndbout_c(" (PK is always of size 1, independent of this value)");
|
|
ndbout_c(" -simple Use simple read to read from database");
|
|
ndbout_c(" -dirty Use dirty read to read from database");
|
|
ndbout_c(" -write Use writeTuple in insert and update");
|
|
ndbout_c(" -n Use standard table names");
|
|
ndbout_c(" -no_table_create Don't create tables in db");
|
|
ndbout_c(" -temp Create table(s) without logging");
|
|
ndbout_c(" -no_hint Don't give hint on where to execute transaction coordinator");
|
|
ndbout_c(" -adaptive Use adaptive send algorithm (default)");
|
|
ndbout_c(" -force Force send when communicating");
|
|
ndbout_c(" -non_adaptive Send at a 10 millisecond interval");
|
|
ndbout_c(" -local 1 = each thread its own node, 2 = round robin on node per parallel trans 3 = random node per parallel trans");
|
|
ndbout_c(" -ndbrecord Use NDB Record");
|
|
ndbout_c(" -r Number of extra loops");
|
|
ndbout_c(" -insert Only run inserts on standard table");
|
|
ndbout_c(" -read Only run reads on standard table");
|
|
ndbout_c(" -update Only run updates on standard table");
|
|
ndbout_c(" -delete Only run deletes on standard table");
|
|
ndbout_c(" -create_table Only run Create Table of standard table");
|
|
ndbout_c(" -drop_table Only run Drop Table on standard table");
|
|
ndbout_c(" -warmup_time Warmup Time before measurement starts");
|
|
ndbout_c(" -execution_time Execution Time where measurement is done");
|
|
ndbout_c(" -cooldown_time Cooldown time after measurement completed");
|
|
ndbout_c(" -table Number of standard table, default 0");
|
|
ndbout_c(" -num_tables Number of tables in benchmark, default 1");
|
|
ndbout_c(" -num_indexes Number of ordered indexes per table in benchmark, default 0");
|
|
ndbout_c(" -receive_cpus A set of CPUs for receive threads, one per connection,"
|
|
" comma separated list with ranges, e.g. 0-2,4");
|
|
}
|
|
|
|
static void
|
|
run_old_flexAsynch(ThreadNdb *pThreadData,
|
|
NdbTimer & timer)
|
|
{
|
|
int tLoops=0;
|
|
/****************************************************************
|
|
* Create NDB objects. *
|
|
****************************************************************/
|
|
resetThreads();
|
|
Uint32 cpu_id = 0;
|
|
for (Uint32 i = 0; i < tNoOfThreads ; i++)
|
|
{
|
|
pThreadData[i].ThreadNo = i;
|
|
threadLife[i] = NdbThread_Create(threadLoop,
|
|
(void**)&pThreadData[i],
|
|
32768,
|
|
"flexAsynchThread",
|
|
NDB_THREAD_PRIO_LOW);
|
|
if (numberOfExecutorCPU != 0)
|
|
{
|
|
Ndb_LockCPU(threadLife[i], Uint32(executorCPUArray[cpu_id]));
|
|
cpu_id++;
|
|
if (cpu_id == numberOfExecutorCPU)
|
|
{
|
|
cpu_id = 0;
|
|
}
|
|
}
|
|
}//for
|
|
ndbout << endl << "All NDB objects and table created" << endl << endl;
|
|
int noOfTransacts = tNoOfParallelTrans*tNoOfTransactions*tNoOfThreads;
|
|
/****************************************************************
|
|
* Execute program. *
|
|
****************************************************************/
|
|
|
|
for (;;)
|
|
{
|
|
|
|
int loopCount = tLoops + 1 ;
|
|
ndbout << endl << "Loop # " << loopCount << endl << endl ;
|
|
|
|
/****************************************************************
|
|
* Perform inserts. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
if (tRunType == RunAll || tRunType == RunInsert)
|
|
{
|
|
ndbout << "Executing inserts" << endl;
|
|
START_TIMER;
|
|
execute(stInsert);
|
|
STOP_TIMER;
|
|
}
|
|
if (tRunType == RunAll)
|
|
{
|
|
a_i.addObservation((1000ULL*noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
|
|
PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
|
|
|
|
if (0 < failed)
|
|
{
|
|
int i = retry_opt ;
|
|
int ci = 1 ;
|
|
while (0 < failed && 0 < i)
|
|
{
|
|
ndbout << failed << " of the transactions returned errors!"
|
|
<< endl << endl;
|
|
ndbout << "Attempting to redo the failed transactions now..."
|
|
<< endl ;
|
|
ndbout << "Redo attempt " << ci <<" out of " << retry_opt
|
|
<< endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stInsert);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("insert", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
ci++;
|
|
}
|
|
if (0 == failed )
|
|
{
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
|
|
}
|
|
else
|
|
{
|
|
ndbout << endl <<"Redo attempt failed, moving on now..." << endl
|
|
<< endl;
|
|
}//if
|
|
}//if
|
|
}//if
|
|
/****************************************************************
|
|
* Perform read. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
if (tRunType == RunAll || tRunType == RunRead)
|
|
{
|
|
for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
|
|
{
|
|
ndbout << "Executing reads" << endl;
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
if (tRunType == RunAll)
|
|
{
|
|
a_r.addObservation((1000ULL * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
}//if
|
|
}//for
|
|
}//if
|
|
|
|
if (tRunType == RunAll)
|
|
{
|
|
if (0 < failed)
|
|
{
|
|
int i = retry_opt ;
|
|
int cr = 1;
|
|
while (0 < failed && 0 < i)
|
|
{
|
|
ndbout << failed << " of the transactions returned errors!"<<endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now..." << endl;
|
|
ndbout << endl;
|
|
ndbout <<"Redo attempt " << cr <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cr++ ;
|
|
}//while
|
|
if (0 == failed )
|
|
{
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
|
|
}
|
|
else
|
|
{
|
|
ndbout << endl <<"Redo attempt failed, moving on now..." << endl << endl ;
|
|
}//if
|
|
}//if
|
|
}//if
|
|
|
|
|
|
/****************************************************************
|
|
* Perform update. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
if (tRunType == RunAll || tRunType == RunUpdate)
|
|
{
|
|
ndbout << "Executing updates" << endl;
|
|
START_TIMER;
|
|
execute(stUpdate);
|
|
STOP_TIMER;
|
|
}//if
|
|
if (tRunType == RunAll)
|
|
{
|
|
a_u.addObservation((1000ULL * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
|
|
PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans) ;
|
|
|
|
if (0 < failed)
|
|
{
|
|
int i = retry_opt ;
|
|
int cu = 1 ;
|
|
while (0 < failed && 0 < i)
|
|
{
|
|
ndbout << failed << " of the transactions returned errors!"<<endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now..." << endl;
|
|
ndbout << endl <<"Redo attempt " << cu <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stUpdate);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("update", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cu++ ;
|
|
}//while
|
|
if (0 == failed )
|
|
{
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
|
|
}
|
|
else
|
|
{
|
|
ndbout << endl;
|
|
ndbout <<"Redo attempt failed, moving on now..." << endl << endl;
|
|
}//if
|
|
}//if
|
|
}//if
|
|
|
|
/****************************************************************
|
|
* Perform read. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
if (tRunType == RunAll)
|
|
{
|
|
for (int ll = 0; ll < 1 + tExtraReadLoop; ll++)
|
|
{
|
|
ndbout << "Executing reads" << endl;
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
a_r.addObservation((1000ULL * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
}
|
|
|
|
if (0 < failed)
|
|
{
|
|
int i = retry_opt ;
|
|
int cr2 = 1 ;
|
|
while (0 < failed && 0 < i)
|
|
{
|
|
ndbout << failed << " of the transactions returned errors!"<<endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now..." << endl;
|
|
ndbout << endl <<"Redo attempt " << cr2 <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stRead);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cr2++ ;
|
|
}//while
|
|
if (0 == failed )
|
|
{
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl;
|
|
}
|
|
else
|
|
{
|
|
ndbout << endl;
|
|
ndbout << "Redo attempt failed, moving on now..." << endl << endl;
|
|
}//if
|
|
}//if
|
|
}//if
|
|
|
|
|
|
/****************************************************************
|
|
* Perform delete. *
|
|
****************************************************************/
|
|
|
|
failed = 0 ;
|
|
|
|
if (tRunType == RunAll || tRunType == RunDelete)
|
|
{
|
|
ndbout << "Executing deletes" << endl;
|
|
START_TIMER;
|
|
execute(stDelete);
|
|
STOP_TIMER;
|
|
}//if
|
|
if (tRunType == RunAll)
|
|
{
|
|
a_d.addObservation((1000ULL * noOfTransacts * tNoOfOpsPerTrans) / timer.elapsedTime());
|
|
PRINT_TIMER("delete", noOfTransacts, tNoOfOpsPerTrans);
|
|
|
|
if (0 < failed) {
|
|
int i = retry_opt ;
|
|
int cd = 1 ;
|
|
while (0 < failed && 0 < i)
|
|
{
|
|
ndbout << failed << " of the transactions returned errors!"<< endl ;
|
|
ndbout << endl;
|
|
ndbout <<"Attempting to redo the failed transactions now:" << endl ;
|
|
ndbout << endl <<"Redo attempt " << cd <<" out of ";
|
|
ndbout << retry_opt << endl << endl;
|
|
failed = 0 ;
|
|
START_TIMER;
|
|
execute(stDelete);
|
|
STOP_TIMER;
|
|
PRINT_TIMER("read", noOfTransacts, tNoOfOpsPerTrans);
|
|
i-- ;
|
|
cd++ ;
|
|
}//while
|
|
if (0 == failed )
|
|
{
|
|
ndbout << endl <<"Redo attempt succeeded" << endl << endl ;
|
|
}
|
|
else
|
|
{
|
|
ndbout << endl;
|
|
ndbout << "Redo attempt failed, moving on now..." << endl << endl;
|
|
}//if
|
|
}//if
|
|
}//if
|
|
|
|
tLoops++;
|
|
ndbout << "--------------------------------------------------" << endl;
|
|
|
|
if (tNoOfLoops != 0)
|
|
{
|
|
if (tNoOfLoops <= tLoops)
|
|
break ;
|
|
}
|
|
}//for
|
|
|
|
execute(stStop);
|
|
void * tmp;
|
|
for (Uint32 i = 0; i < tNoOfThreads; i++)
|
|
{
|
|
NdbThread_WaitFor(threadLife[i], &tmp);
|
|
NdbThread_Destroy(&threadLife[i]);
|
|
}
|
|
}
|
|
template class Vector<NdbDictionary::RecordSpecification>;
|