polardbxengine/plugin/polarx_rpc/utility/buffered_output_stream.h

162 lines
4.1 KiB
C++

//
// Created by zzy on 2022/7/27.
//
#pragma once
#include <utility>
#include <vector>
#include "../common_define.h"
#include "../server/server_variables.h"
#include "buffer.h"
#include "../coders/protocol_fwd.h"
namespace polarx_rpc {
static constexpr size_t DEFAULT_BUFFER_SIZE = 0x1000; /// 4KB
class CbufferedOutputStream final
: public google::protobuf::io::ZeroCopyOutputStream {
NO_COPY(CbufferedOutputStream);
private:
std::vector<Cbuffer> buffers_;
int buffer_index_;
size_t bytes_total_;
inline void init() {
if (UNLIKELY(buffer_index_ < 0)) {
assert(-1 == buffer_index_);
buffer_index_ = 0;
if (UNLIKELY(buffers_.empty()))
buffers_.emplace_back(DEFAULT_BUFFER_SIZE);
}
}
public:
CbufferedOutputStream() : buffer_index_(-1), bytes_total_(0) {}
CbufferedOutputStream(CbufferedOutputStream &&another) noexcept
: buffers_(std::move(another.buffers_)),
buffer_index_(another.buffer_index_),
bytes_total_(another.bytes_total_) {
another.buffers_.clear();
another.buffer_index_ = -1;
another.bytes_total_ = 0;
}
CbufferedOutputStream &operator=(CbufferedOutputStream &&another) noexcept {
buffers_ = std::move(another.buffers_);
buffer_index_ = another.buffer_index_;
bytes_total_ = another.bytes_total_;
another.buffers_.clear();
another.buffer_index_ = -1;
another.bytes_total_ = 0;
}
inline void clear() {
auto cache_pages = max_cached_output_buffer_pages;
if (UNLIKELY(buffers_.size() > cache_pages))
buffers_.resize(cache_pages);
for (auto &buf : buffers_)
buf.pos() = 0;
buffer_index_ = -1;
bytes_total_ = 0;
}
inline void *reserve(size_t sz) {
if (sz > DEFAULT_BUFFER_SIZE)
return nullptr;
init();
auto buf = &buffers_[buffer_index_];
if (UNLIKELY(buf->cap() - buf->pos() < sz)) {
assert(buf->pos() <= buf->cap());
/// move to next buf
bytes_total_ += buf->pos();
++buffer_index_;
if (UNLIKELY(buffer_index_ >= static_cast<int>(buffers_.size())))
buffers_.emplace_back(DEFAULT_BUFFER_SIZE);
buf = &buffers_[buffer_index_];
}
auto ptr = buf->ptr() + buf->pos();
buf->pos() += sz;
assert(buf->pos() <= buf->cap());
return ptr;
}
bool Next(void **data, int *size) final {
init();
auto buf = &buffers_[buffer_index_];
if (UNLIKELY(buf->pos() >= buf->cap())) {
assert(buf->pos() == buf->cap());
/// move to next buf
bytes_total_ += buf->pos();
++buffer_index_;
if (UNLIKELY(buffer_index_ >= static_cast<int>(buffers_.size())))
buffers_.emplace_back(DEFAULT_BUFFER_SIZE);
buf = &buffers_[buffer_index_];
}
*data = buf->ptr() + buf->pos();
*size = static_cast<int>(buf->cap() - buf->pos());
buf->pos() = buf->cap();
return true;
}
void BackUp(int count) final {
assert(buffer_index_ < static_cast<int>(buffers_.size()));
auto &buf = buffers_[buffer_index_];
assert(count < static_cast<int>(buf.pos()));
buf.pos() -= count;
}
int64_t ByteCount() const final {
if (UNLIKELY(-1 == buffer_index_))
return static_cast<int64_t>(bytes_total_);
return static_cast<int64_t>(bytes_total_ + buffers_[buffer_index_].pos());
}
/**
* Backup data for bad data restore.
*/
struct backup_t final {
size_t buf_pos;
int index;
size_t bytes_total;
};
inline void backup(backup_t &b) const {
if (UNLIKELY(buffer_index_ < 0)) {
assert(-1 == buffer_index_);
assert(0 == bytes_total_);
b.buf_pos = 0;
b.index = -1;
b.bytes_total = 0;
} else {
assert(buffer_index_ < static_cast<int>(buffers_.size()));
b.buf_pos = buffers_[buffer_index_].pos();
b.index = buffer_index_;
b.bytes_total = bytes_total_;
}
}
inline void restore(const backup_t &b) {
if (UNLIKELY(-1 == b.index))
clear();
else {
assert(buffer_index_ >= b.index);
for (auto i = buffer_index_; i > b.index; --i)
buffers_[i].pos() = 0;
buffer_index_ = b.index;
buffers_[buffer_index_].pos() = b.buf_pos;
bytes_total_ = b.bytes_total;
}
}
};
} // namespace polarx_rpc