179 lines
5.5 KiB
C++
179 lines
5.5 KiB
C++
//
|
|
// Created by zzy on 2022/8/29.
|
|
//
|
|
|
|
#pragma once
|
|
|
|
// Modified & optimized from crossbeam.
|
|
|
|
#include <atomic>
|
|
#include <cassert>
|
|
#include <cstddef>
|
|
#include <cstdint>
|
|
#include <memory>
|
|
|
|
#include "../common_define.h"
|
|
#include "backoff.h"
|
|
#include "number.h"
|
|
|
|
namespace polarx_rpc {
|
|
|
|
template <class T> class CarrayQueue {
|
|
NO_COPY(CarrayQueue);
|
|
|
|
private:
|
|
// pushed from tail and popped from head
|
|
cache_padded_t<std::atomic<size_t>> head_;
|
|
cache_padded_t<std::atomic<size_t>> tail_;
|
|
|
|
struct slot_t final {
|
|
std::atomic<size_t> stamp;
|
|
T value;
|
|
};
|
|
|
|
std::unique_ptr<slot_t[]> buffer_;
|
|
const size_t cap_;
|
|
const size_t one_lap_;
|
|
|
|
public:
|
|
explicit CarrayQueue(size_t cap)
|
|
: head_(0), tail_(0), buffer_(new slot_t[cap]), cap_(cap),
|
|
one_lap_(Cnumber::next_power_of_two_sz(cap + 1)) {
|
|
static_assert(0 == offsetof(CarrayQueue, head_) &&
|
|
CACHE_LINE_FETCH_ALIGN == offsetof(CarrayQueue, tail_) &&
|
|
2 * CACHE_LINE_FETCH_ALIGN ==
|
|
offsetof(CarrayQueue, buffer_),
|
|
"Bad align of CarrayQueue.");
|
|
assert(cap_ > 0);
|
|
for (size_t i = 0; i < cap_; ++i)
|
|
buffer_[i].stamp.store(
|
|
i, std::memory_order_relaxed); // init first round stamp
|
|
std::atomic_thread_fence(std::memory_order_release);
|
|
}
|
|
|
|
CarrayQueue(CarrayQueue &&another) noexcept
|
|
: head_(another.head_().load(std::memory_order_acquire)),
|
|
tail_(another.tail_().load(std::memory_order_acquire)),
|
|
buffer_(std::move(another.buffer_)), cap_(another.cap_),
|
|
one_lap_(another.one_lap_) {
|
|
buffer_.reset();
|
|
}
|
|
|
|
CarrayQueue &operator=(CarrayQueue &&another) = delete;
|
|
|
|
template <class V> inline bool push(V &&v) {
|
|
Cbackoff backoff;
|
|
auto tail = tail_().load(std::memory_order_relaxed);
|
|
|
|
while (true) {
|
|
auto index = tail & (one_lap_ - 1);
|
|
auto lap = tail & ~(one_lap_ - 1);
|
|
|
|
assert(index < cap_);
|
|
auto &slot = buffer_[index];
|
|
auto stamp = slot.stamp.load(std::memory_order_acquire);
|
|
|
|
if (LIKELY(tail == stamp)) {
|
|
// slot is ready for next one
|
|
auto new_tail = index + 1 < cap_ ? tail + 1 : lap + one_lap_;
|
|
if (LIKELY(tail_().compare_exchange_weak(tail, new_tail,
|
|
std::memory_order_seq_cst,
|
|
std::memory_order_relaxed))) {
|
|
// win the race
|
|
slot.value = std::forward<V>(v);
|
|
slot.stamp.store(tail + 1, std::memory_order_release);
|
|
return true;
|
|
}
|
|
backoff.spin();
|
|
} else if (stamp + one_lap_ == tail + 1) {
|
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
|
// may full and recheck head
|
|
auto head = head_().load(std::memory_order_relaxed);
|
|
if (head + one_lap_ == tail)
|
|
return false; // full
|
|
backoff.spin();
|
|
// reload and recheck
|
|
tail = tail_().load(std::memory_order_relaxed);
|
|
} else {
|
|
backoff.snooze(); // wait data move and stamp update
|
|
tail = tail_().load(std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
inline bool pop(T &v) {
|
|
Cbackoff backoff;
|
|
auto head = head_().load(std::memory_order_relaxed);
|
|
|
|
while (true) {
|
|
auto index = head & (one_lap_ - 1);
|
|
auto lap = head & ~(one_lap_ - 1);
|
|
|
|
assert(index < cap_);
|
|
auto &slot = buffer_[index];
|
|
auto stamp = slot.stamp.load(std::memory_order_acquire);
|
|
|
|
if (LIKELY(head + 1 == stamp)) {
|
|
// slot was correctly filled
|
|
auto new_head = index + 1 < cap_ ? head + 1 : lap + one_lap_;
|
|
if (LIKELY(head_().compare_exchange_weak(head, new_head,
|
|
std::memory_order_seq_cst,
|
|
std::memory_order_relaxed))) {
|
|
// win the race
|
|
v = std::move(slot.value);
|
|
slot.stamp.store(head + one_lap_, std::memory_order_release);
|
|
return true;
|
|
}
|
|
backoff.spin();
|
|
} else if (stamp == head) {
|
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
|
// may empty and recheck tail
|
|
auto tail = tail_().load(std::memory_order_relaxed);
|
|
if (tail == head)
|
|
return false; // empty
|
|
backoff.spin();
|
|
// reload and recheck
|
|
head = head_().load(std::memory_order_relaxed);
|
|
} else {
|
|
backoff.snooze(); // wait data move and stamp update
|
|
head = head_().load(std::memory_order_relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
inline size_t capacity() const { return cap_; }
|
|
|
|
inline bool empty() const {
|
|
auto head = head_().load(std::memory_order_seq_cst);
|
|
auto tail = tail_().load(std::memory_order_seq_cst);
|
|
return tail == head;
|
|
}
|
|
|
|
inline bool full() const {
|
|
auto head = head_().load(std::memory_order_seq_cst);
|
|
auto tail = tail_().load(std::memory_order_seq_cst);
|
|
return head + one_lap_ == tail;
|
|
}
|
|
|
|
inline size_t length() const {
|
|
while (true) {
|
|
auto tail = tail_().load(std::memory_order_seq_cst);
|
|
auto head = head_().load(std::memory_order_seq_cst);
|
|
|
|
if (tail_().load(std::memory_order_seq_cst) == tail) {
|
|
auto hix = head & (one_lap_ - 1);
|
|
auto tix = tail & (one_lap_ - 1);
|
|
return hix < tix
|
|
? tix - hix
|
|
: (hix > tix ? cap_ - hix + tix : (tail == head ? 0 : cap_));
|
|
}
|
|
}
|
|
}
|
|
|
|
inline size_t head() const { return head_().load(std::memory_order_seq_cst); }
|
|
|
|
inline size_t tail() const { return tail_().load(std::memory_order_seq_cst); }
|
|
};
|
|
|
|
} // namespace polarx_rpc
|