838 lines
23 KiB
C++
838 lines
23 KiB
C++
/*
|
|
Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
|
|
|
|
This program is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License, version 2.0,
|
|
as published by the Free Software Foundation.
|
|
|
|
This program is also distributed with certain software (including
|
|
but not limited to OpenSSL) that is licensed under separate terms,
|
|
as designated in a particular file or component or in included license
|
|
documentation. The authors of MySQL hereby grant you an additional
|
|
permission to link the program and your derivative works with the
|
|
separately licensed software that they have included with MySQL.
|
|
|
|
This program is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License, version 2.0, for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program; if not, write to the Free Software
|
|
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
|
|
*/
|
|
|
|
//****************************************************************************
|
|
//
|
|
// NAME
|
|
// TransporterRegistry
|
|
//
|
|
// DESCRIPTION
|
|
// TransporterRegistry (singelton) is the interface to the
|
|
// transporter layer. It handles transporter states and
|
|
// holds the transporter arrays.
|
|
//
|
|
//***************************************************************************/
|
|
#ifndef TransporterRegistry_H
|
|
#define TransporterRegistry_H
|
|
|
|
#if defined(HAVE_EPOLL_CREATE)
|
|
#include <sys/epoll.h>
|
|
#endif
|
|
#include "TransporterDefinitions.hpp"
|
|
#include <SocketServer.hpp>
|
|
#include <SocketClient.hpp>
|
|
|
|
#include <NdbTCP.h>
|
|
|
|
#include <mgmapi/mgmapi.h>
|
|
|
|
#include <NodeBitmask.hpp>
|
|
#include <NdbMutex.h>
|
|
|
|
// A transporter is always in an IOState.
|
|
// NoHalt is used initially and as long as it is no restrictions on
|
|
// sending or receiving.
|
|
enum IOState {
|
|
NoHalt = 0,
|
|
HaltInput = 1,
|
|
HaltOutput = 2,
|
|
HaltIO = 3
|
|
};
|
|
|
|
|
|
static const char *performStateString[] =
|
|
{ "is connected",
|
|
"is trying to connect",
|
|
"does nothing",
|
|
"is trying to disconnect" };
|
|
|
|
class Transporter;
|
|
class TCP_Transporter;
|
|
class SHM_Transporter;
|
|
|
|
class TransporterRegistry;
|
|
class SocketAuthenticator;
|
|
|
|
class TransporterService : public SocketServer::Service {
|
|
SocketAuthenticator * m_auth;
|
|
TransporterRegistry * m_transporter_registry;
|
|
public:
|
|
TransporterService(SocketAuthenticator *auth= 0)
|
|
{
|
|
m_auth= auth;
|
|
m_transporter_registry= 0;
|
|
}
|
|
void setTransporterRegistry(TransporterRegistry *t)
|
|
{
|
|
m_transporter_registry= t;
|
|
}
|
|
SocketServer::Session * newSession(NDB_SOCKET_TYPE socket);
|
|
};
|
|
|
|
/**
|
|
* TransporterReceiveData
|
|
*
|
|
* State for pollReceive/performReceive
|
|
* Moved into own class to enable multiple receive threads
|
|
*/
|
|
struct TransporterReceiveData
|
|
{
|
|
TransporterReceiveData();
|
|
~TransporterReceiveData();
|
|
|
|
bool init (unsigned maxTransporters);
|
|
|
|
/**
|
|
* Add a transporter to epoll_set
|
|
* does nothing if epoll not active
|
|
*/
|
|
bool epoll_add(Transporter*);
|
|
|
|
/**
|
|
* Bitmask of transporters currently handled by this instance
|
|
*/
|
|
NodeBitmask m_transporters;
|
|
|
|
/**
|
|
* Bitmask of transporters having data awaiting to be received
|
|
* from its transporter.
|
|
*/
|
|
NodeBitmask m_recv_transporters;
|
|
|
|
/**
|
|
* Bitmask of transporters that has already received data buffered
|
|
* inside its transporter. Possibly "carried over" from last
|
|
* performReceive
|
|
*/
|
|
NodeBitmask m_has_data_transporters;
|
|
|
|
/**
|
|
* Subset of m_has_data_transporters which we completed handling
|
|
* of in previous ::performReceive before we was interrupted due
|
|
* to lack of job buffers. Will skip these when we later retry
|
|
* ::performReceive in order to avoid starvation of non-handled
|
|
* transporters.
|
|
*/
|
|
NodeBitmask m_handled_transporters;
|
|
|
|
/**
|
|
* Bitmask of transporters having received corrupted or unsupported
|
|
* message. No more unpacking and delivery of messages allowed.
|
|
*/
|
|
NodeBitmask m_bad_data_transporters;
|
|
|
|
/**
|
|
* Last node received from if unable to complete all transporters
|
|
* in previous ::performReceive(). Next ::performReceive will
|
|
* resume from first transporter after this.
|
|
*/
|
|
Uint32 m_last_nodeId;
|
|
|
|
/**
|
|
* Spintime calculated as maximum of currently connected transporters.
|
|
* Only applies to shared memory transporters.
|
|
*/
|
|
Uint32 m_spintime;
|
|
|
|
/**
|
|
* Total spintime
|
|
*/
|
|
Uint32 m_total_spintime;
|
|
|
|
#if defined(HAVE_EPOLL_CREATE)
|
|
int m_epoll_fd;
|
|
struct epoll_event *m_epoll_events;
|
|
bool change_epoll(TCP_Transporter *t, bool add);
|
|
#endif
|
|
|
|
/**
|
|
* Used in polling if exists TCP_Transporter
|
|
*/
|
|
ndb_socket_poller m_socket_poller;
|
|
};
|
|
|
|
#include "TransporterCallback.hpp"
|
|
|
|
/**
|
|
* @class TransporterRegistry
|
|
* @brief ...
|
|
*/
|
|
class TransporterRegistry
|
|
{
|
|
friend class SHM_Transporter;
|
|
friend class SHM_Writer;
|
|
friend class Transporter;
|
|
friend class TransporterService;
|
|
public:
|
|
/**
|
|
* Constructor
|
|
*/
|
|
TransporterRegistry(TransporterCallback *callback,
|
|
TransporterReceiveHandle * receiveHandle,
|
|
unsigned maxTransporters = MAX_NTRANSPORTERS);
|
|
|
|
/**
|
|
* this handle will be used in the client connect thread
|
|
* to fetch information on dynamic ports. The old handle
|
|
* (if set) is destroyed, and this is destroyed by the destructor
|
|
*/
|
|
void set_mgm_handle(NdbMgmHandle h);
|
|
NdbMgmHandle get_mgm_handle(void) { return m_mgm_handle; }
|
|
|
|
bool init(NodeId localNodeId);
|
|
|
|
/**
|
|
* Iff using non-default TransporterReceiveHandle's
|
|
* they need to get initalized
|
|
*/
|
|
bool init(TransporterReceiveHandle&);
|
|
|
|
/**
|
|
Perform handshaking of a client connection to accept it
|
|
as transporter.
|
|
|
|
@note Connection should be closed by caller if function
|
|
returns false
|
|
|
|
@param sockfd the socket to handshake
|
|
@param mgs error message describing why handshake failed,
|
|
to be filled in when function return
|
|
@param close_with_reset allows the function to indicate to the caller
|
|
how the socket should be closed when function
|
|
returns false
|
|
|
|
@returns false on failure and true on success
|
|
*/
|
|
bool connect_server(NDB_SOCKET_TYPE sockfd,
|
|
BaseString& msg,
|
|
bool& close_with_reset) const;
|
|
|
|
bool connect_client(NdbMgmHandle *h);
|
|
|
|
/**
|
|
* Given a SocketClient, creates a NdbMgmHandle, turns it into a transporter
|
|
* and returns the socket.
|
|
*/
|
|
NDB_SOCKET_TYPE connect_ndb_mgmd(const char* server_name,
|
|
unsigned short server_port);
|
|
|
|
/**
|
|
* Given a connected NdbMgmHandle, turns it into a transporter
|
|
* and returns the socket.
|
|
*/
|
|
NDB_SOCKET_TYPE connect_ndb_mgmd(NdbMgmHandle *h);
|
|
|
|
private:
|
|
|
|
/**
|
|
* Report the dynamically allocated ports to ndb_mgmd so that clients
|
|
* which want to connect to ndbd can ask ndb_mgmd which port to use.
|
|
*/
|
|
bool report_dynamic_ports(NdbMgmHandle h) const;
|
|
|
|
/**
|
|
* Remove all transporters
|
|
*/
|
|
void removeAll();
|
|
|
|
/**
|
|
* Disconnect all transporters
|
|
*/
|
|
void disconnectAll();
|
|
|
|
/**
|
|
* Reset awake state on shared memory transporters before sleep.
|
|
*/
|
|
int reset_shm_awake_state(TransporterReceiveHandle& recvdata,
|
|
bool& sleep_state_set);
|
|
|
|
/**
|
|
* Set awake state on shared memory transporters after sleep.
|
|
*/
|
|
void set_shm_awake_state(TransporterReceiveHandle& recvdata);
|
|
|
|
public:
|
|
|
|
/**
|
|
* Stops the server, disconnects all the transporter
|
|
* and deletes them and remove it from the transporter arrays
|
|
*/
|
|
virtual ~TransporterRegistry();
|
|
|
|
bool start_service(SocketServer& server);
|
|
struct NdbThread* start_clients();
|
|
bool stop_clients();
|
|
void start_clients_thread();
|
|
|
|
/**
|
|
* Start/Stop receiving
|
|
*/
|
|
void startReceiving();
|
|
void stopReceiving();
|
|
|
|
/**
|
|
* Start/Stop sending
|
|
*/
|
|
void startSending();
|
|
void stopSending();
|
|
|
|
// A transporter is always in a PerformState.
|
|
// PerformIO is used initially and as long as any of the events
|
|
// PerformConnect, ...
|
|
enum PerformState {
|
|
CONNECTED = 0,
|
|
CONNECTING = 1,
|
|
DISCONNECTED = 2,
|
|
DISCONNECTING = 3
|
|
};
|
|
const char *getPerformStateString(NodeId nodeId) const
|
|
{ return performStateString[(unsigned)performStates[nodeId]]; }
|
|
|
|
PerformState getPerformState(NodeId nodeId) const { return performStates[nodeId]; }
|
|
|
|
/**
|
|
* Get and set methods for PerformState
|
|
*/
|
|
void do_connect(NodeId node_id);
|
|
/**
|
|
* do_disconnect can be issued both from send and recv, it is possible to
|
|
* specify from where it is called in send_source parameter, this enables
|
|
* us to provide more detailed information for disconnects.
|
|
*/
|
|
bool do_disconnect(NodeId node_id, int errnum = 0, bool send_source = true);
|
|
bool is_connected(NodeId node_id) const {
|
|
return performStates[node_id] == CONNECTED;
|
|
}
|
|
private:
|
|
void report_connect(TransporterReceiveHandle&, NodeId node_id);
|
|
void report_disconnect(TransporterReceiveHandle&, NodeId node_id, int errnum);
|
|
void report_error(NodeId nodeId, TransporterError errorCode,
|
|
const char *errorInfo = 0);
|
|
void dump_and_report_bad_message(const char file[], unsigned line,
|
|
TransporterReceiveHandle & recvHandle,
|
|
Uint32 * readPtr,
|
|
size_t sizeOfData,
|
|
NodeId remoteNodeId,
|
|
IOState state,
|
|
TransporterError errorCode);
|
|
public:
|
|
|
|
/**
|
|
* Get and set methods for IOState
|
|
*/
|
|
IOState ioState(NodeId nodeId) const;
|
|
void setIOState(NodeId nodeId, IOState state);
|
|
|
|
/**
|
|
* Methods to handle backoff of connection attempts when attempt fails
|
|
*/
|
|
public:
|
|
void indicate_node_up(NodeId nodeId);
|
|
void set_connect_backoff_max_time_in_ms(Uint32 max_time_in_ms);
|
|
private:
|
|
Uint32 get_connect_backoff_max_time_in_laps() const;
|
|
bool get_and_clear_node_up_indicator(NodeId nodeId);
|
|
void backoff_reset_connecting_time(NodeId nodeId);
|
|
bool backoff_update_and_check_time_for_connect(NodeId nodeId);
|
|
|
|
private:
|
|
|
|
bool createTCPTransporter(TransporterConfiguration * config);
|
|
bool createSHMTransporter(TransporterConfiguration * config);
|
|
|
|
public:
|
|
/**
|
|
* configureTransporter
|
|
*
|
|
* Configure a transporter, ie. create new if it
|
|
* does not exist otherwise try to reconfigure it
|
|
*
|
|
*/
|
|
bool configureTransporter(TransporterConfiguration * config);
|
|
|
|
/**
|
|
* Get sum of max send buffer over all transporters, to be used as a default
|
|
* for allocate_send_buffers eg.
|
|
*
|
|
* Must be called after creating all transporters for returned value to be
|
|
* correct.
|
|
*/
|
|
Uint64 get_total_max_send_buffer() {
|
|
DBUG_ASSERT(m_total_max_send_buffer > 0);
|
|
return m_total_max_send_buffer;
|
|
}
|
|
|
|
/**
|
|
* Get transporter's connect count
|
|
*/
|
|
Uint32 get_connect_count(Uint32 nodeId);
|
|
|
|
/**
|
|
* Set or clear overloaded bit.
|
|
* Query if any overloaded bit is set.
|
|
*/
|
|
void set_status_overloaded(Uint32 nodeId, bool val);
|
|
const NodeBitmask& get_status_overloaded() const;
|
|
|
|
/**
|
|
* Get transporter's overload count since connect
|
|
*/
|
|
Uint32 get_overload_count(Uint32 nodeId);
|
|
|
|
/**
|
|
* Set or clear slowdown bit.
|
|
* Query if any slowdown bit is set.
|
|
*/
|
|
void set_status_slowdown(Uint32 nodeId, bool val);
|
|
const NodeBitmask& get_status_slowdown() const;
|
|
|
|
/**
|
|
* Get transporter's slowdown count since connect
|
|
*/
|
|
Uint32 get_slowdown_count(Uint32 nodeId);
|
|
|
|
/**
|
|
* prepareSend
|
|
*
|
|
* When IOState is HaltOutput or HaltIO do not send or insert any
|
|
* signals in the SendBuffer, unless it is intended for the remote
|
|
* QMGR block (blockno 252)
|
|
* Perform prepareSend on the transporter.
|
|
*
|
|
* NOTE signalHeader->xxxBlockRef should contain block numbers and
|
|
* not references
|
|
*/
|
|
|
|
private:
|
|
template <typename AnySectionArg>
|
|
SendStatus prepareSendTemplate(
|
|
TransporterSendBufferHandle *sendHandle,
|
|
const SignalHeader *signalHeader,
|
|
Uint8 prio,
|
|
const Uint32 *signalData,
|
|
NodeId nodeId,
|
|
AnySectionArg section);
|
|
|
|
|
|
public:
|
|
SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
|
|
const SignalHeader *signalHeader,
|
|
Uint8 prio,
|
|
const Uint32 *signalData,
|
|
NodeId nodeId,
|
|
const LinearSectionPtr ptr[3]);
|
|
|
|
SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
|
|
const SignalHeader *signalHeader,
|
|
Uint8 prio,
|
|
const Uint32 *signalData,
|
|
NodeId nodeId,
|
|
class SectionSegmentPool & pool,
|
|
const SegmentedSectionPtr ptr[3]);
|
|
|
|
SendStatus prepareSend(TransporterSendBufferHandle *sendHandle,
|
|
const SignalHeader *signalHeader,
|
|
Uint8 prio,
|
|
const Uint32 *signalData,
|
|
NodeId nodeId,
|
|
const GenericSectionPtr ptr[3]);
|
|
|
|
/**
|
|
* external_IO
|
|
*
|
|
* Equal to: poll(...); perform_IO()
|
|
*
|
|
*/
|
|
void external_IO(Uint32 timeOutMillis);
|
|
|
|
bool performSend(NodeId nodeId, bool need_wakeup = true);
|
|
void performSend();
|
|
|
|
#ifdef DEBUG_TRANSPORTER
|
|
void printState();
|
|
#endif
|
|
|
|
class Transporter_interface {
|
|
public:
|
|
NodeId m_remote_nodeId;
|
|
int m_s_service_port; // signed port number
|
|
const char *m_interface;
|
|
};
|
|
Vector<Transporter_interface> m_transporter_interface;
|
|
void add_transporter_interface(NodeId remoteNodeId, const char *interf,
|
|
int s_port); // signed port. <0 is dynamic
|
|
|
|
int get_transporter_count() const;
|
|
Transporter* get_transporter(NodeId nodeId) const;
|
|
bool is_shm_transporter(NodeId nodeId);
|
|
struct in_addr get_connect_address(NodeId node_id) const;
|
|
|
|
Uint64 get_bytes_sent(NodeId nodeId) const;
|
|
Uint64 get_bytes_received(NodeId nodeId) const;
|
|
|
|
private:
|
|
TransporterCallback *const callbackObj;
|
|
TransporterReceiveHandle *const receiveHandle;
|
|
|
|
NdbMgmHandle m_mgm_handle;
|
|
|
|
struct NdbThread *m_start_clients_thread;
|
|
bool m_run_start_clients_thread;
|
|
|
|
int sendCounter;
|
|
NodeId localNodeId;
|
|
unsigned maxTransporters;
|
|
Uint32 nTransporters;
|
|
Uint32 nTCPTransporters;
|
|
Uint32 nSHMTransporters;
|
|
|
|
#ifdef ERROR_INSERT
|
|
Bitmask<MAX_NTRANSPORTERS/32> m_blocked;
|
|
Bitmask<MAX_NTRANSPORTERS/32> m_blocked_disconnected;
|
|
int m_disconnect_errors[MAX_NTRANSPORTERS];
|
|
|
|
Bitmask<MAX_NTRANSPORTERS/32> m_sendBlocked;
|
|
|
|
Uint32 m_mixology_level;
|
|
#endif
|
|
|
|
/**
|
|
* Arrays holding all transporters in the order they are created
|
|
*/
|
|
Transporter** allTransporters;
|
|
TCP_Transporter** theTCPTransporters;
|
|
SHM_Transporter** theSHMTransporters;
|
|
|
|
/**
|
|
* Array, indexed by nodeId, holding all transporters
|
|
*/
|
|
TransporterType* theTransporterTypes;
|
|
Transporter** theTransporters;
|
|
|
|
/**
|
|
* State arrays, index by host id
|
|
*/
|
|
PerformState* performStates;
|
|
int* m_disconnect_errnum;
|
|
Uint32* m_disconnect_enomem_error;
|
|
IOState* ioStates;
|
|
struct ErrorState {
|
|
TransporterError m_code;
|
|
const char *m_info;
|
|
};
|
|
struct ErrorState *m_error_states;
|
|
|
|
/**
|
|
* peerUpIndicators[nodeId] is set by receiver thread
|
|
* to indicate that node is probable up.
|
|
* It is read and cleared by start clients thread.
|
|
*/
|
|
volatile bool* peerUpIndicators;
|
|
|
|
/**
|
|
* Count of how long time one have been attempting to
|
|
* connect to node nodeId, in units of 100ms.
|
|
*/
|
|
Uint32* connectingTime;
|
|
|
|
/**
|
|
* The current maximal time between connection attempts to a
|
|
* node in units of 100ms.
|
|
* Updated by receive thread, read by start clients thread
|
|
*/
|
|
volatile Uint32 connectBackoffMaxTime;
|
|
|
|
/**
|
|
* Overloaded bits, for fast check.
|
|
* Similarly slowdown bits for fast check.
|
|
*/
|
|
NodeBitmask m_status_overloaded;
|
|
NodeBitmask m_status_slowdown;
|
|
|
|
/**
|
|
* Unpack signal data.
|
|
*
|
|
* Defined in Packer.cpp.
|
|
*/
|
|
Uint32 unpack(TransporterReceiveHandle&,
|
|
Uint32 * readPtr,
|
|
Uint32 bufferSize,
|
|
NodeId remoteNodeId,
|
|
IOState state,
|
|
bool & stopReceiving);
|
|
|
|
Uint32 * unpack(TransporterReceiveHandle&,
|
|
Uint32 * readPtr,
|
|
Uint32 * eodPtr,
|
|
Uint32 * endPtr,
|
|
NodeId remoteNodeId,
|
|
IOState state,
|
|
bool & stopReceiving);
|
|
|
|
static Uint32 unpack_length_words(const Uint32 *readPtr,
|
|
Uint32 maxWords,
|
|
bool extra_signal);
|
|
/**
|
|
* Disconnect the transporter and remove it from
|
|
* theTransporters array. Do not allow any holes
|
|
* in theTransporters. Delete the transporter
|
|
* and remove it from theIndexedTransporters array
|
|
*/
|
|
void removeTransporter(NodeId nodeId);
|
|
|
|
Uint32 poll_TCP(Uint32 timeOutMillis, TransporterReceiveHandle&);
|
|
Uint32 poll_SHM(TransporterReceiveHandle&, bool &any_connected);
|
|
Uint32 poll_SHM(TransporterReceiveHandle&,
|
|
NDB_TICKS start_time,
|
|
Uint32 micros_to_poll);
|
|
Uint32 check_TCP(TransporterReceiveHandle&, Uint32 timeoutMillis);
|
|
Uint32 spin_check_transporters(TransporterReceiveHandle&);
|
|
|
|
int m_shm_own_pid;
|
|
Uint32 m_transp_count;
|
|
|
|
public:
|
|
bool setup_wakeup_socket(TransporterReceiveHandle&);
|
|
void wakeup();
|
|
|
|
inline bool setup_wakeup_socket() {
|
|
assert(receiveHandle != 0);
|
|
return setup_wakeup_socket(* receiveHandle);
|
|
}
|
|
private:
|
|
bool m_has_extra_wakeup_socket;
|
|
NDB_SOCKET_TYPE m_extra_wakeup_sockets[2];
|
|
void consume_extra_sockets();
|
|
|
|
Uint32 *getWritePtr(TransporterSendBufferHandle *handle,
|
|
NodeId node,
|
|
Uint32 lenBytes,
|
|
Uint32 prio,
|
|
SendStatus* error);
|
|
void updateWritePtr(TransporterSendBufferHandle *handle,
|
|
NodeId node, Uint32 lenBytes, Uint32 prio);
|
|
|
|
public:
|
|
/* Various internal */
|
|
void inc_overload_count(Uint32 nodeId);
|
|
void inc_slowdown_count(Uint32 nodeId);
|
|
|
|
private:
|
|
/**
|
|
* Sum of max transporter memory for each transporter.
|
|
* Used to compute default send buffer size.
|
|
*/
|
|
Uint64 m_total_max_send_buffer;
|
|
|
|
public:
|
|
/**
|
|
* Receiving
|
|
*/
|
|
Uint32 pollReceive(Uint32 timeOutMillis, TransporterReceiveHandle& mask);
|
|
Uint32 performReceive(TransporterReceiveHandle&);
|
|
void update_connections(TransporterReceiveHandle&);
|
|
|
|
inline Uint32 pollReceive(Uint32 timeOutMillis) {
|
|
assert(receiveHandle != 0);
|
|
return pollReceive(timeOutMillis, * receiveHandle);
|
|
}
|
|
|
|
inline Uint32 performReceive() {
|
|
assert(receiveHandle != 0);
|
|
return performReceive(* receiveHandle);
|
|
}
|
|
|
|
inline void update_connections() {
|
|
assert(receiveHandle != 0);
|
|
update_connections(* receiveHandle);
|
|
}
|
|
inline Uint32 get_total_spintime()
|
|
{
|
|
assert(receiveHandle != 0);
|
|
return receiveHandle->m_total_spintime;
|
|
}
|
|
inline void reset_total_spintime()
|
|
{
|
|
assert(receiveHandle != 0);
|
|
receiveHandle->m_total_spintime = 0;
|
|
}
|
|
|
|
#ifdef ERROR_INSERT
|
|
/* Utils for testing latency issues */
|
|
bool isBlocked(NodeId nodeId);
|
|
void blockReceive(TransporterReceiveHandle&, NodeId nodeId);
|
|
void unblockReceive(TransporterReceiveHandle&, NodeId nodeId);
|
|
bool isSendBlocked(NodeId nodeId) const;
|
|
void blockSend(TransporterReceiveHandle& recvdata,
|
|
NodeId nodeId);
|
|
void unblockSend(TransporterReceiveHandle& recvdata,
|
|
NodeId nodeId);
|
|
|
|
/* Testing interleaving of signal processing */
|
|
Uint32 getMixologyLevel() const;
|
|
void setMixologyLevel(Uint32 l);
|
|
#endif
|
|
};
|
|
|
|
inline void
|
|
TransporterRegistry::set_status_overloaded(Uint32 nodeId, bool val)
|
|
{
|
|
assert(nodeId < MAX_NODES);
|
|
if (val != m_status_overloaded.get(nodeId))
|
|
{
|
|
m_status_overloaded.set(nodeId, val);
|
|
if (val)
|
|
inc_overload_count(nodeId);
|
|
}
|
|
if (val)
|
|
set_status_slowdown(nodeId, val);
|
|
}
|
|
|
|
inline const NodeBitmask&
|
|
TransporterRegistry::get_status_overloaded() const
|
|
{
|
|
return m_status_overloaded;
|
|
}
|
|
|
|
inline void
|
|
TransporterRegistry::set_status_slowdown(Uint32 nodeId, bool val)
|
|
{
|
|
assert(nodeId < MAX_NODES);
|
|
if (val != m_status_slowdown.get(nodeId))
|
|
{
|
|
m_status_slowdown.set(nodeId, val);
|
|
if (val)
|
|
inc_slowdown_count(nodeId);
|
|
}
|
|
}
|
|
|
|
inline const NodeBitmask&
|
|
TransporterRegistry::get_status_slowdown() const
|
|
{
|
|
return m_status_slowdown;
|
|
}
|
|
|
|
inline void
|
|
TransporterRegistry::indicate_node_up(NodeId nodeId) // Called from receive thread
|
|
{
|
|
assert(nodeId < MAX_NODES);
|
|
|
|
if (!peerUpIndicators[nodeId])
|
|
{
|
|
peerUpIndicators[nodeId] = true;
|
|
}
|
|
}
|
|
|
|
inline bool
|
|
TransporterRegistry::get_and_clear_node_up_indicator(NodeId nodeId) // Called from start client thread
|
|
{
|
|
assert(nodeId < MAX_NODES);
|
|
|
|
bool indicator = peerUpIndicators[nodeId];
|
|
if (indicator)
|
|
{
|
|
peerUpIndicators[nodeId] = false;
|
|
}
|
|
return indicator;
|
|
}
|
|
|
|
inline Uint32
|
|
TransporterRegistry::get_connect_backoff_max_time_in_laps() const
|
|
{ /* one lap, 100 ms */
|
|
return connectBackoffMaxTime;
|
|
}
|
|
|
|
inline void
|
|
TransporterRegistry::set_connect_backoff_max_time_in_ms(Uint32 backoff_max_time_in_ms)
|
|
{
|
|
/**
|
|
* Round up backoff_max_time to nearest higher 100ms, since that is lap time
|
|
* in start_client_threads using this function.
|
|
*/
|
|
connectBackoffMaxTime = (backoff_max_time_in_ms + 99) / 100;
|
|
}
|
|
|
|
inline void
|
|
TransporterRegistry::backoff_reset_connecting_time(NodeId nodeId)
|
|
{
|
|
assert(nodeId < MAX_NODES);
|
|
|
|
connectingTime[nodeId] = 0;
|
|
}
|
|
|
|
inline bool
|
|
TransporterRegistry::backoff_update_and_check_time_for_connect(NodeId nodeId)
|
|
{
|
|
assert(nodeId < MAX_NODES);
|
|
|
|
Uint32 backoff_max_time = get_connect_backoff_max_time_in_laps();
|
|
|
|
if (backoff_max_time == 0)
|
|
{
|
|
// Backoff disabled
|
|
return true;
|
|
}
|
|
|
|
connectingTime[nodeId] ++;
|
|
|
|
if (connectingTime[nodeId] >= backoff_max_time)
|
|
{
|
|
return (connectingTime[nodeId] % backoff_max_time == 0);
|
|
}
|
|
|
|
/**
|
|
* Attempt moments from start of connecting.
|
|
* This function is called from start_clients_thread
|
|
* roughly every 100ms for each node it is connecting
|
|
* to.
|
|
*/
|
|
static const Uint16 attempt_moments[] = {1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024};
|
|
static const int attempt_moments_count = sizeof(attempt_moments) / sizeof(attempt_moments[0]);
|
|
for(int i = 0; i < attempt_moments_count; i ++)
|
|
{
|
|
if (connectingTime[nodeId] == attempt_moments[i])
|
|
{
|
|
return true;
|
|
}
|
|
else if (connectingTime[nodeId] < attempt_moments[i])
|
|
{
|
|
return false;
|
|
}
|
|
}
|
|
return (connectingTime[nodeId] % attempt_moments[attempt_moments_count - 1] == 0);
|
|
}
|
|
|
|
/**
|
|
* A function used to calculate a send buffer level given the size of the node
|
|
* send buffer and the total send buffer size for all nodes and the total send
|
|
* buffer used for all nodes. There is also a thread parameter that specifies
|
|
* the number of threads used (this is 0 except for ndbmtd).
|
|
*/
|
|
void calculate_send_buffer_level(Uint64 node_send_buffer_size,
|
|
Uint64 total_send_buffer_size,
|
|
Uint64 total_used_send_buffer_size,
|
|
Uint32 num_threads,
|
|
SB_LevelType &level);
|
|
#endif // Define of TransporterRegistry_H
|