67 lines
1.6 KiB
C++
67 lines
1.6 KiB
C++
/**
|
|
* Replica Consistent Read Manager
|
|
*/
|
|
|
|
#pragma once
|
|
|
|
#include <queue>
|
|
#include <mutex>
|
|
#include <condition_variable>
|
|
#include <utility>
|
|
#include <atomic>
|
|
#include <algorithm>
|
|
|
|
extern uint64_t opt_replica_read_timeout;
|
|
|
|
static const uint64_t DEFAULT_REPLICA_READ_TIMEOUT = 3000; // ms
|
|
|
|
class ReplicaReadManager {
|
|
|
|
struct ReadLsnCond {
|
|
const uint64_t lsn;
|
|
std::condition_variable cv;
|
|
|
|
ReadLsnCond(uint64_t lsn): lsn(lsn) {
|
|
}
|
|
|
|
bool operator() (ReadLsnCond *a, ReadLsnCond *b) {
|
|
// ordered by lsn ascending in the wait queue
|
|
return a->lsn > b->lsn;
|
|
}
|
|
};
|
|
|
|
template<typename T,
|
|
class Container=std::vector<T>,
|
|
class Compare=std::less<typename Container::value_type>>
|
|
class erasable_priority_queue : public std::priority_queue<T, std::vector<T>> {
|
|
public:
|
|
bool erase(const T& value) {
|
|
auto iter = std::find(this->c.begin(), this->c.end(), value);
|
|
if (iter != this->c.end()) {
|
|
this->c.erase(iter);
|
|
std::make_heap(this->c.begin(), this->c.end(), this->comp);
|
|
return true;
|
|
} else {
|
|
return false;
|
|
}
|
|
}
|
|
};
|
|
|
|
std::mutex m_mutex;
|
|
erasable_priority_queue<ReadLsnCond *, std::vector<ReadLsnCond *>, ReadLsnCond> m_wait_queue;
|
|
|
|
/**
|
|
* Current applied index, should be monotonically increasing
|
|
*/
|
|
std::atomic<uint64_t> m_applied_lsn;
|
|
|
|
public:
|
|
ReplicaReadManager() {
|
|
}
|
|
|
|
bool wait_for_lsn(uint64_t read_lsn);
|
|
void update_lsn(uint64_t new_lsn);
|
|
};
|
|
|
|
extern ReplicaReadManager replica_read_manager;
|