/************************************************************************ * * Copyright (c) 2016 Alibaba.com, Inc. All Rights Reserved * $Id: configuration.cc,v 1.0 07/30/2016 04:52:09 PM yingqiang.zyq(yingqiang.zyq@alibaba-inc.com) $ * ************************************************************************/ /** * @file configuration.cc * @author yingqiang.zyq(yingqiang.zyq@alibaba-inc.com) * @date 07/30/2016 04:52:09 PM * @version 1.0 * @brief the implement of the configuration * **/ #include #include #include "paxos.h" #include "paxos_configuration.h" namespace alisql { void StableConfiguration::forEach(const SideEffect& sideEffect, void *ptr) { for (auto& it : servers) if(it) sideEffect(*it, ptr); } void StableConfiguration::forEachLearners(const SideEffect& sideEffect, void *ptr) { for (auto& it : learners) if(it) sideEffect(*it, ptr); } bool StableConfiguration::quorumAll(const Predicate& predicate) const { if (servers.empty()) return true; uint64_t count = 0; for (auto& it : servers) if(it) if (predicate(*it)) ++count; return (count >= getServerNum() / 2 + 1); } uint64_t StableConfiguration::quorumMin(const GetValue& getValue) const { if (servers.empty()) return 0; std::vector values; for (auto& it : servers) if(it) values.push_back(getValue(*it)); std::sort(values.begin(), values.end()); return values.at((values.size() - 1)/ 2); } uint64_t StableConfiguration::forceMin(const GetValue& getValue) const { if (servers.empty()) return 0; uint64_t value= UINT64_MAX; for (auto& it : servers) { if (it && it->forceSync) { auto tmp= getValue(*it); if (tmp < value) { value= tmp; } } } return value; } uint64_t StableConfiguration::allMin(const GetValue& getValue) const { if (servers.empty()) return 0; uint64_t value= UINT64_MAX; for (auto& it : servers) { if(it) { auto tmp= getValue(*it); if (tmp < value) { value= tmp; } } } return value; } Configuration::ServerRef StableConfiguration::getServer(uint64_t serverId) { if (serverId == 0) return nullptr; if (serverId < 100) { serverId -= 1; //servers' id start from 1 if (servers.size() > serverId) return servers[serverId]; else return nullptr; } else // getLearners { //learners' id start from 0 /* serverId -= 100; if (learners.size() > serverId) return learners[serverId]; else return nullptr; */ for (auto& learner : learners) { if (learner && learner->serverId == serverId) return learner; } return nullptr; } } Configuration::ServerRef StableConfiguration::getLearnerByAddr(const std::string& addr) { for (auto& learner : learners) { if (learner && getAddr(learner->strAddr) == addr) return learner; } return nullptr; } uint64_t StableConfiguration::getServerIdFromAddr(const std::string& addr) { for (auto& server : servers) { if (server && getAddr(server->strAddr) == addr) return server->serverId; } for (auto& learner : learners) { if (learner && getAddr(learner->strAddr) == addr) return learner->serverId; } return 0; } void StableConfiguration::installConfig(const std::vector& strConfig, uint64_t current, Paxos *paxos, std::shared_ptr localServer) { uint64_t i= 0; std::shared_ptr ptrL; std::shared_ptr ptrR; forEach(&Server::stop, NULL); servers.clear(); servers.resize(strConfig.size()); for (auto &str : strConfig) { ++ i; if (str == "0") { /* empty server */ servers[i-1]= nullptr; continue; } serversNum++; if (i != current) { servers[i-1]= (ptrR= std::make_shared(i)); initServerFromString(ptrR, str); ptrR->srv= paxos->getService(); ptrR->paxos= paxos; ptrR->heartbeatTimer= std::unique_ptr(new ThreadTimer(paxos->getService()->getThreadTimerService(), paxos->getService(), paxos->getHeartbeatTimeout(), ThreadTimer::Repeatable, Paxos::heartbeatCallback, std::move(std::weak_ptr(ptrR)))); } else { std::shared_ptr tmpServer; tmpServer= localServer ? localServer : std::make_shared(i); tmpServer->serverId= i; servers[i-1]= (ptrL= tmpServer); initServerFromString(ptrL, str); paxos->setLocalServer(ptrL); ptrL->paxos= paxos; } } assert(i == strConfig.size()); } std::string StableConfiguration::memberToString(ServerRef server) { std::string ret(""); ret += server->strAddr; ret += "#"; ret += server->electionWeight + '0'; ret += server->forceSync ? "F" : "N"; return ret; } std::string StableConfiguration::learnerToString(ServerRef server) { std::string ret(""); ret += server->strAddr; ret += "$"; ret += server->learnerSource/10 + '0'; ret += server->learnerSource%10 + '0'; return ret; } std::string StableConfiguration::configToString(std::vector& servers, const std::string& localAddr, bool forceMember) { std::string ret(""); uint64_t localIndex= 0, i= 1; for (auto& server : servers) { if (server != nullptr) { if (localAddr != "" || forceMember) { ret += memberToString(server); } else { ret += learnerToString(server); } ret += ";"; if (localAddr != "" && server->strAddr == localAddr) localIndex= i; } else { ret += "0;"; } ++ i; } if (i != 1) { if (localAddr != "") { assert(localIndex != 0); ret[ret.size() - 1]= '@'; ret += std::to_string(localIndex); } else { ret.resize(ret.size() - 1); } } return ret; } Configuration::ServerRef StableConfiguration::addServer(std::vector& servers, ServerRef newServer, bool useAppend) { uint64_t id= 0; for (auto& it : servers) { ++ id; if (!useAppend && it == nullptr) { it= newServer; newServer->serverId= id; return newServer; } } ++ id; servers.push_back(newServer); newServer->serverId= id; return newServer; } void StableConfiguration::initServerFromString(ServerRef server, std::string str, bool isLearner) { auto size= str.size(); uint64_t pos= 0; if (str.at(size -3) == '#') { server->forceSync= (str.at(size -1) == 'S'); server->electionWeight= str.at(size -2) - '0'; str.resize(size -3); } else if ((pos= str.find('$', 0)) != std::string::npos) { uint64_t tmpSource= str.at(pos + 1) - '0'; tmpSource *= 10; tmpSource += str.at(pos + 2) - '0'; str.resize(pos); server->learnerSource= tmpSource; server->forceSync= 0; server->electionWeight= 5; } else { server->forceSync= 0; if (isLearner) server->electionWeight= 0; else server->electionWeight= 5; } server->strAddr= str; } void StableConfiguration::initServerDefault(ServerRef serverArg) { auto server= std::dynamic_pointer_cast(serverArg); server->isLearner= false; server->forceSync= false; server->electionWeight= 5; server->appliedIndex= 0; } std::vector StableConfiguration::stringToVector(const std::string& str, uint64_t& currentIndex) { /* Return obj has already been optimized in C++11. */ /* 127.0.0.1:10001;127.0.0.1:10002;127.0.0.1:10003@1 */ /* 127.0.0.1:10001#9N;127.0.0.1:10002#5N;127.0.0.1:10003#0N@1 */ std::vector ret; uint64_t start=0, stop= 0; do { stop= str.find(';', start); if (stop == std::string::npos) break; /* If learner source is larger than 109, skip this ';', it represents 11. */ if (stop > 1 && str[stop - 1] == '$') stop= str.find(';', stop + 1); if (stop == std::string::npos) break; ret.push_back(str.substr(start, stop - start)); start= stop + 1; } while (stop != std::string::npos); stop= str.find('@', start); /* If learner source is larger than 159, skip this '@', it represents 16. */ if (stop == std::string::npos || (stop > 1 && str[stop - 1] == '$')) { if (str.size() != 0) ret.push_back(str.substr(start, stop - start)); } else { ret.push_back(str.substr(start, stop - start)); start= stop + 1; currentIndex= std::stoull(str.substr(start)); } return ret; } uint64_t StableConfiguration::getServerNum() const { uint64_t cnt= 0; for (auto& it : servers) if(it) ++cnt; return cnt; } uint64_t StableConfiguration::getLearnerNum() const { uint64_t cnt= 0; for (auto& it : learners) if(it) ++cnt; return cnt; } bool StableConfiguration::needWeightElection(uint64_t localWeight) { for (auto& it : servers) { if (it && it->electionWeight > localWeight) return true; } return false; } uint64_t StableConfiguration::getMaxWeightServerId(uint64_t baseEpoch, ServerRef localServer) { uint64_t ret= localServer->serverId; uint64_t priv= localServer->electionWeight; for (auto& it : servers) { if (it == nullptr) continue; auto lastAckEpoch= it->getLastAckEpoch(); if (it->electionWeight > priv && lastAckEpoch > baseEpoch) { ret= it->serverId; priv= it->electionWeight; } } return ret; } int StableConfiguration::addMember(const std::string& strAddr, Paxos *paxos) { std::string logBuf= ""; for (auto& server : servers) { if (server) { logBuf += std::to_string(server->serverId); logBuf += ":"; logBuf += server->strAddr; logBuf += " "; } } easy_warn_log("Server %d : StableConfiguration::addMember: current servers(%s), add server(%s)\n", paxos->getLocalServer()->serverId, logBuf.c_str(), strAddr.c_str()); for (auto it= learners.begin(); it != learners.end(); ++it) if (*it && (*it)->strAddr == strAddr) { auto server= std::dynamic_pointer_cast(*it); serversNum.fetch_add(1); addServer(servers, server); *it= nullptr; //servers.push_back(server); //learners.erase(it); //server->serverId= servers.size(); initServerDefault(server); if (paxos->getState() == Paxos::LEADER) { server->beginLeadership((void *)1); } else server->stepDown(NULL); server->connect(NULL); logBuf= ""; for (auto& server : servers) { if (server) { logBuf += std::to_string(server->serverId); logBuf += ":"; logBuf += server->strAddr; logBuf += " "; } } while (learners.size() > 0 && learners[learners.size() - 1] == nullptr) learners.resize(learners.size() - 1); paxos->getLog()->setMetaData(Paxos::keyLearnerConfigure, learnersToString()); paxos->getLog()->setMetaData(Paxos::keyMemberConfigure, membersToString(paxos->getLocalServer()->strAddr)); easy_warn_log("Server %d : StableConfiguration::addMember: success current servers(%s)\n", paxos->getLocalServer()->serverId, logBuf.c_str()); return 0; } easy_warn_log("Server %d : StableConfiguration::addMember: fail current servers(%s)\n", paxos->getLocalServer()->serverId, logBuf.c_str()); return -1; } int StableConfiguration::delMember(const std::string& strAddr, Paxos *paxos) { int ret= -1; for (auto it= servers.begin(); it != servers.end(); ++ it) { if (*it == nullptr) continue; if ((*it)->strAddr == strAddr) { auto serverId= (*it)->serverId; (*it)->stop(NULL); serversNum.fetch_sub(1); //servers.erase(it); *it= nullptr; ret= 0; /* recycle the last nullptr in the servers */ if (serverId == servers.size() && servers.size() > 0) { while (servers[servers.size() - 1] == nullptr) servers.resize(servers.size() - 1); } if (paxos) paxos->getLog()->setMetaData(Paxos::keyMemberConfigure, membersToString(paxos->getLocalServer()->strAddr)); break; } } return ret; } int StableConfiguration::configureLearner(const uint64_t serverId, uint64_t source, Paxos *paxos) { auto server= this->getServer(serverId); if (!server) { easy_warn_log("Server %d : StableConfiguration::configureLearner: server %d not found, just skip.", paxos->getLocalServer()->serverId, serverId); return 0; } server->learnerSource= source; if (paxos) paxos->getLog()->setMetaData(Paxos::keyLearnerConfigure, learnersToString()); return 0; } int StableConfiguration::configureMember(const uint64_t serverId, bool forceSync, uint electionWeight, Paxos *paxos) { auto server= this->getServer(serverId); if (!server) { easy_warn_log("Server %d : StableConfiguration::configureMember: server %d not found, just skip.", paxos->getLocalServer()->serverId, serverId); return 0; } server->forceSync= forceSync; server->electionWeight= electionWeight; if (paxos) paxos->getLog()->setMetaData(Paxos::keyMemberConfigure, membersToString(paxos->getLocalServer()->strAddr)); return 0; } void StableConfiguration::addLearners(const std::vector& strConfig, Paxos *paxos, bool replaceAll) { if (strConfig.size() == 0) return; std::shared_ptr ptrR; uint64_t i= 0; /* if (learners.size() == 0) i= 100; else i= learners.back()->serverId + 1; */ for (auto &str : strConfig) { ++ i; if (str == "0") { /* empty server */ if (learners.size() >= i) learners[i-1]= nullptr; else { assert(learners.size() == i - 1); learners.push_back(nullptr); } continue; } //learners.push_back(ptrR= std::make_shared(i++)); ptrR= std::make_shared(0); addServer(learners, ptrR, replaceAll); ptrR->serverId += 99; ptrR->srv= paxos->getService(); ptrR->paxos= paxos; ptrR->isLearner= true; initServerFromString(ptrR, str); //ptrR->strAddr= str; //ptrR->forceSync= false; //ptrR->electionWeight= 0; ptrR->appliedIndex= 0; /* The Learner will become follower finally, so we create heartbeatTimer here. */ ptrR->heartbeatTimer= std::unique_ptr(new ThreadTimer(paxos->getService()->getThreadTimerService(), paxos->getService(), paxos->getHeartbeatTimeout(), ThreadTimer::Repeatable, Paxos::heartbeatCallback, std::move(std::weak_ptr(ptrR)))); if ((paxos->getState() == Paxos::LEADER && ptrR->learnerSource == 0) || ptrR->learnerSource == paxos->getLocalServer()->serverId) ptrR->beginLeadership((void *)1); } paxos->getLog()->setMetaData(Paxos::keyLearnerConfigure, learnersToString()); } void StableConfiguration::delLearners(const std::vector& strConfig, Paxos *paxos) { for (auto &str : strConfig) { for (auto it= learners.begin(); it != learners.end(); ++it) if (*it && (*it)->strAddr == str) { /* TODO: clear for RemoteServer. */ (*it)->stop(NULL); *it= nullptr; //learners.erase(it); break; } } while (learners.size() > 0 && learners[learners.size() - 1] == nullptr) learners.resize(learners.size() - 1); if (paxos) paxos->getLog()->setMetaData(Paxos::keyLearnerConfigure, learnersToString()); } void StableConfiguration::delAllLearners() { for (auto it= learners.begin(); it != learners.end(); ++it) if (*it) (*it)->stop(NULL); learners.clear(); } void StableConfiguration::delAllRemoteServer(const std::string& localStrAddr, Paxos *paxos) { //uint64_t id= 1; for (auto it= servers.begin(); it != servers.end();++ it) { if (*it && (*it)->strAddr != localStrAddr) { (*it)->stop(NULL); serversNum.fetch_sub(1); *it= nullptr; //servers.erase(it); } } assert(serversNum.load() == 1); if (paxos) paxos->getLog()->setMetaData(Paxos::keyMemberConfigure, membersToString(paxos->getLocalServer()->strAddr)); if (servers.size() > 0) { while (servers[servers.size() - 1] == nullptr) servers.resize(servers.size() - 1); } } void StableConfiguration::mergeFollowerMeta(const ::google::protobuf::RepeatedPtrField< ::alisql::ClusterInfoEntry >& ciEntries) { for (auto cit= ciEntries.begin(); cit != ciEntries.end(); ++ cit) { auto learner= std::dynamic_pointer_cast(getServer(cit->serverid())); if (learner == nullptr) { easy_warn_log("StableConfiguration::mergeFollowerMeta: try to find server %d, but not in current configure!!\n", cit->serverid()); continue; } if (learner->serverId == cit->serverid() && learner->learnerSource == cit->learnersource()) { learner->matchIndex.store(cit->matchindex()); learner->nextIndex.store(cit->nextindex()); learner->appliedIndex.store(cit->appliedindex()); learner->lastMergeTP= learner->now(); } } } std::string StableConfiguration::getAddr(const std::string& addr) { std::string ret= addr; uint64_t pos= std::string::npos; pos= ret.find('$', 0); if (pos == std::string::npos) pos= ret.find('#', 0); if (pos != std::string::npos) ret.resize(pos); return ret; } bool StableConfiguration::isServerInVector(const std::string& server, const std::vector& strConfig) { std::string addr= getAddr(server); for (auto& str : strConfig) { if (addr == getAddr(str)) return true; } return false; } void StableConfiguration::reset_flow_control() { for (auto& server : servers) if (server) server->flowControl = 0; for (auto& learner : learners) if (learner) learner->flowControl = 0; } void StableConfiguration::set_flow_control(uint64_t serverId, int64_t fc) { for (auto& server : servers) { if (server && server->serverId == serverId) { server->flowControl = fc; return; } } for (auto& learner : learners) { if (learner && learner->serverId == serverId) { learner->flowControl = fc; return; } } } } //namespace alisql