189 lines
5.5 KiB
C++
189 lines
5.5 KiB
C++
/************************************************************************
|
|
*
|
|
* Copyright (c) 2016 Alibaba.com, Inc. All Rights Reserved
|
|
* $Id: paxos_log.h,v 1.0 08/17/2016 11:20:41 AM yingqiang.zyq(yingqiang.zyq@alibaba-inc.com) $
|
|
*
|
|
************************************************************************/
|
|
|
|
/**
|
|
* @file paxos_log.h
|
|
* @author yingqiang.zyq(yingqiang.zyq@alibaba-inc.com)
|
|
* @date 08/17/2016 11:20:41 AM
|
|
* @version 1.0
|
|
* @brief the interface of paxos log.
|
|
*
|
|
**/
|
|
|
|
#ifndef cluster_paxos_log_INC
|
|
#define cluster_paxos_log_INC
|
|
|
|
#include <deque>
|
|
#include <string>
|
|
#include <mutex>
|
|
#include <atomic>
|
|
#include <functional>
|
|
#include "paxos.pb.h"
|
|
#include "log_meta_cache.h"
|
|
|
|
namespace alisql {
|
|
|
|
enum LogOperation
|
|
{
|
|
kNormal= 0,
|
|
kConfigureChange= 7,
|
|
kMock= 8,
|
|
kPut= 3,
|
|
kDel= 4,
|
|
kCas= 5,
|
|
kTairSet= 6,
|
|
kCommitDep= 11,
|
|
kCommitDepEnd= 12,
|
|
kNop= 10
|
|
};
|
|
|
|
|
|
/**
|
|
* @class PaxosLog
|
|
*
|
|
* @brief
|
|
*
|
|
**/
|
|
class PaxosLog {
|
|
public:
|
|
PaxosLog () :currentTerm_(0) {};
|
|
virtual ~PaxosLog () {};
|
|
virtual int getEntry(uint64_t logIndex, LogEntry &entry, bool fastfail, uint64_t /* serverId */) {
|
|
return getEntry(logIndex, entry, fastfail);
|
|
}
|
|
virtual int getEntry(uint64_t logIndex, LogEntry &entry, bool fastfail= false) = 0;
|
|
virtual const LogEntry *getEntry(uint64_t /* logIndex */, bool /* fastfail */= false) {return NULL;}
|
|
virtual int getEmptyEntry(LogEntry &entry) = 0;
|
|
virtual uint64_t getLeftSize(uint64_t startLogIndex)
|
|
{
|
|
const LogEntry *entry;
|
|
uint64_t lastLogIndex= getLastLogIndex();
|
|
uint64_t size= 0;
|
|
for (uint64_t i= startLogIndex; i <= lastLogIndex; ++i)
|
|
{
|
|
if (NULL != (entry= getEntry(i, true)))
|
|
size += entry->ByteSize();
|
|
else
|
|
break;
|
|
}
|
|
return size;
|
|
}
|
|
virtual bool getLeftSize(uint64_t startLogIndex, uint64_t maxPacketSize)
|
|
{
|
|
const LogEntry *entry;
|
|
uint64_t lastLogIndex= getLastLogIndex();
|
|
uint64_t size= 0;
|
|
for (uint64_t i= startLogIndex; i <= lastLogIndex; ++i)
|
|
{
|
|
if (NULL != (entry= getEntry(i, true)))
|
|
size += entry->ByteSize();
|
|
else
|
|
break;
|
|
|
|
if (size >= maxPacketSize)
|
|
return true;
|
|
}
|
|
return false;
|
|
}
|
|
virtual uint64_t getLastLogTerm()
|
|
{
|
|
LogEntry entry;
|
|
getEntry(getLastLogIndex(), entry, false);
|
|
return entry.term();
|
|
}
|
|
virtual uint64_t getLastLogIndex() = 0;
|
|
virtual uint64_t getLastCachedLogIndex() { return getLastLogIndex(); }
|
|
// get the actual sync index before transferring leader
|
|
// should not be called if paxos mutex is held, there is deadlock risk in AliSQLServer
|
|
virtual uint64_t getSafeLastLogIndex() { return getLastLogIndex(); }
|
|
virtual uint64_t getLength() = 0;
|
|
virtual uint64_t append(const LogEntry &entry) = 0;
|
|
virtual uint64_t appendWithCheck(const LogEntry &entry) {return append(entry);};
|
|
/* Truncate the log entry after (include!) firstIndex. */
|
|
virtual void truncateBackward(uint64_t firstIndex) = 0;
|
|
/* Truncate the log entry before (exclude!) lastIndex */
|
|
virtual void truncateForward(uint64_t lastIndex) = 0;
|
|
virtual void setPurgeLogFilter(std::function<bool(const LogEntry &le)> /* cb */) {};
|
|
virtual int getMetaData(const std::string &key, uint64_t *value) = 0;
|
|
virtual int setMetaData(const std::string &key, const uint64_t value) = 0;
|
|
virtual int getMetaData(const std::string &key, std::string &value) {
|
|
uint64_t ivalue;
|
|
int ret= getMetaData(key, &ivalue);
|
|
if (ret == 0)
|
|
value= std::to_string(ivalue);
|
|
else
|
|
value= "";
|
|
return ret;
|
|
};
|
|
virtual int setMetaData(const std::string &key, const std::string &value) {
|
|
if (value.empty())
|
|
return -1;
|
|
else
|
|
return setMetaData(key, std::stoull(value));
|
|
};
|
|
virtual uint64_t append(const ::google::protobuf::RepeatedPtrField<LogEntry> &entries) = 0;
|
|
virtual void setTerm(uint64_t term) {
|
|
lock_.lock();
|
|
/* We may reset term now 20170321 */
|
|
//assert(term >= currentTerm_);
|
|
currentTerm_= term;
|
|
lock_.unlock();
|
|
}
|
|
|
|
typedef struct Stats {
|
|
std::atomic<uint64_t> countMetaGetTotal;
|
|
std::atomic<uint64_t> countMetaGetInCache;
|
|
Stats()
|
|
:countMetaGetTotal(0)
|
|
,countMetaGetInCache(0)
|
|
{}
|
|
} StatsType;
|
|
const StatsType &getStats() {return stats_;}
|
|
|
|
void initMetaCache() { metaCache_.init(); }
|
|
void resetMetaCache() { metaCache_.reset(); }
|
|
void putLogMeta(uint64_t index, uint64_t term, uint64_t optype, uint64_t info) {
|
|
metaCache_.putLogMeta(index, term, optype, info);
|
|
}
|
|
int getLogMeta(uint64_t index, uint64_t *term, uint64_t *optype, uint64_t *info) {
|
|
int ret= 0;
|
|
if (metaCache_.getLogMeta(index, term, optype, info) == false) {
|
|
LogEntry le;
|
|
ret= getEntry(index, le, false);
|
|
*term = le.term();
|
|
*optype = le.optype();
|
|
*info = le.info();
|
|
} else {
|
|
++stats_.countMetaGetInCache;
|
|
}
|
|
++stats_.countMetaGetTotal;
|
|
return ret;
|
|
}
|
|
virtual bool isStateMachineHealthy() {return true;}
|
|
virtual bool entriesPreCheck(const ::google::protobuf::RepeatedPtrField<LogEntry> &entries) { return 0; }
|
|
|
|
protected:
|
|
uint64_t currentTerm_;
|
|
std::mutex lock_;
|
|
|
|
StatsType stats_;
|
|
private:
|
|
PaxosLog ( const PaxosLog &other ); // copy constructor
|
|
const PaxosLog& operator = ( const PaxosLog &other ); // assignment operator
|
|
|
|
LogMetaCache metaCache_;
|
|
};/* end of class PaxosLog */
|
|
|
|
|
|
|
|
|
|
|
|
|
|
} //namespace alisql
|
|
|
|
#endif //#ifndef cluster_paxos_log_INC
|