162 lines
4.1 KiB
C++
162 lines
4.1 KiB
C++
//
|
|
// Created by zzy on 2022/7/27.
|
|
//
|
|
|
|
#pragma once
|
|
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "../common_define.h"
|
|
#include "../server/server_variables.h"
|
|
|
|
#include "buffer.h"
|
|
#include "../coders/protocol_fwd.h"
|
|
|
|
namespace polarx_rpc {
|
|
|
|
static constexpr size_t DEFAULT_BUFFER_SIZE = 0x1000; /// 4KB
|
|
|
|
class CbufferedOutputStream final
|
|
: public google::protobuf::io::ZeroCopyOutputStream {
|
|
NO_COPY(CbufferedOutputStream);
|
|
|
|
private:
|
|
std::vector<Cbuffer> buffers_;
|
|
int buffer_index_;
|
|
size_t bytes_total_;
|
|
|
|
inline void init() {
|
|
if (UNLIKELY(buffer_index_ < 0)) {
|
|
assert(-1 == buffer_index_);
|
|
buffer_index_ = 0;
|
|
if (UNLIKELY(buffers_.empty()))
|
|
buffers_.emplace_back(DEFAULT_BUFFER_SIZE);
|
|
}
|
|
}
|
|
|
|
public:
|
|
CbufferedOutputStream() : buffer_index_(-1), bytes_total_(0) {}
|
|
CbufferedOutputStream(CbufferedOutputStream &&another) noexcept
|
|
: buffers_(std::move(another.buffers_)),
|
|
buffer_index_(another.buffer_index_),
|
|
bytes_total_(another.bytes_total_) {
|
|
another.buffers_.clear();
|
|
another.buffer_index_ = -1;
|
|
another.bytes_total_ = 0;
|
|
}
|
|
|
|
CbufferedOutputStream &operator=(CbufferedOutputStream &&another) noexcept {
|
|
buffers_ = std::move(another.buffers_);
|
|
buffer_index_ = another.buffer_index_;
|
|
bytes_total_ = another.bytes_total_;
|
|
another.buffers_.clear();
|
|
another.buffer_index_ = -1;
|
|
another.bytes_total_ = 0;
|
|
}
|
|
|
|
inline void clear() {
|
|
auto cache_pages = max_cached_output_buffer_pages;
|
|
if (UNLIKELY(buffers_.size() > cache_pages))
|
|
buffers_.resize(cache_pages);
|
|
for (auto &buf : buffers_)
|
|
buf.pos() = 0;
|
|
buffer_index_ = -1;
|
|
bytes_total_ = 0;
|
|
}
|
|
|
|
inline void *reserve(size_t sz) {
|
|
if (sz > DEFAULT_BUFFER_SIZE)
|
|
return nullptr;
|
|
|
|
init();
|
|
auto buf = &buffers_[buffer_index_];
|
|
if (UNLIKELY(buf->cap() - buf->pos() < sz)) {
|
|
assert(buf->pos() <= buf->cap());
|
|
/// move to next buf
|
|
bytes_total_ += buf->pos();
|
|
++buffer_index_;
|
|
if (UNLIKELY(buffer_index_ >= static_cast<int>(buffers_.size())))
|
|
buffers_.emplace_back(DEFAULT_BUFFER_SIZE);
|
|
buf = &buffers_[buffer_index_];
|
|
}
|
|
|
|
auto ptr = buf->ptr() + buf->pos();
|
|
buf->pos() += sz;
|
|
assert(buf->pos() <= buf->cap());
|
|
return ptr;
|
|
}
|
|
|
|
bool Next(void **data, int *size) final {
|
|
init();
|
|
auto buf = &buffers_[buffer_index_];
|
|
if (UNLIKELY(buf->pos() >= buf->cap())) {
|
|
assert(buf->pos() == buf->cap());
|
|
/// move to next buf
|
|
bytes_total_ += buf->pos();
|
|
++buffer_index_;
|
|
if (UNLIKELY(buffer_index_ >= static_cast<int>(buffers_.size())))
|
|
buffers_.emplace_back(DEFAULT_BUFFER_SIZE);
|
|
buf = &buffers_[buffer_index_];
|
|
}
|
|
|
|
*data = buf->ptr() + buf->pos();
|
|
*size = static_cast<int>(buf->cap() - buf->pos());
|
|
buf->pos() = buf->cap();
|
|
return true;
|
|
}
|
|
|
|
void BackUp(int count) final {
|
|
assert(buffer_index_ < static_cast<int>(buffers_.size()));
|
|
auto &buf = buffers_[buffer_index_];
|
|
assert(count < static_cast<int>(buf.pos()));
|
|
buf.pos() -= count;
|
|
}
|
|
|
|
int64_t ByteCount() const final {
|
|
if (UNLIKELY(-1 == buffer_index_))
|
|
return static_cast<int64_t>(bytes_total_);
|
|
return static_cast<int64_t>(bytes_total_ + buffers_[buffer_index_].pos());
|
|
}
|
|
|
|
/**
|
|
* Backup data for bad data restore.
|
|
*/
|
|
|
|
struct backup_t final {
|
|
size_t buf_pos;
|
|
int index;
|
|
size_t bytes_total;
|
|
};
|
|
|
|
inline void backup(backup_t &b) const {
|
|
if (UNLIKELY(buffer_index_ < 0)) {
|
|
assert(-1 == buffer_index_);
|
|
assert(0 == bytes_total_);
|
|
b.buf_pos = 0;
|
|
b.index = -1;
|
|
b.bytes_total = 0;
|
|
} else {
|
|
assert(buffer_index_ < static_cast<int>(buffers_.size()));
|
|
b.buf_pos = buffers_[buffer_index_].pos();
|
|
b.index = buffer_index_;
|
|
b.bytes_total = bytes_total_;
|
|
}
|
|
}
|
|
|
|
inline void restore(const backup_t &b) {
|
|
if (UNLIKELY(-1 == b.index))
|
|
clear();
|
|
else {
|
|
assert(buffer_index_ >= b.index);
|
|
for (auto i = buffer_index_; i > b.index; --i)
|
|
buffers_[i].pos() = 0;
|
|
buffer_index_ = b.index;
|
|
buffers_[buffer_index_].pos() = b.buf_pos;
|
|
bytes_total_ = b.bytes_total;
|
|
}
|
|
}
|
|
};
|
|
|
|
} // namespace polarx_rpc
|