6 #ifndef UTILS_UTL_BUFFER_H
7 #define UTILS_UTL_BUFFER_H
14 #include <shared_mutex>
23 static constexpr uint32_t BUFFER_SIZE = 16;
24 static constexpr uint32_t BUFFER_MASK{BUFFER_SIZE - 1};
31 void write(
const T& in) {
32 uint32_t pos = enqueue_pos_.load(std::memory_order_relaxed);
36 buf = &buffer_[pos & BUFFER_MASK];
37 }
while (!buf->guard.try_lock());
40 enqueue_pos_.store(pos, std::memory_order_release);
43 constexpr uint32_t size() {
return BUFFER_SIZE; }
45 uint32_t read(T* out)
const {
46 uint32_t pos = enqueue_pos_.load(std::memory_order_acquire);
47 Cell* buf = &buffer_[pos & BUFFER_MASK];
48 buf->guard.lock_shared();
50 buf->guard.unlock_shared();
54 uint32_t pos()
const {
return enqueue_pos_.load(std::memory_order_acquire); }
58 #if __cplusplus >= 201703L
59 std::shared_mutex guard;
61 std::shared_timed_mutex guard;
66 std::atomic<uint32_t> enqueue_pos_{};
72 explicit MpmcQueue(uint32_t buffer_length) : buffer_(
new Cell[buffer_length]{}), buffer_mask_(buffer_length - 1) {
73 log_assert((buffer_length >= 2) && ((buffer_length & (buffer_length - 1)) == 0),
74 "Queue length must be 2^N")
for (uint32_t i = 0; i < buffer_length; ++i) {
75 buffer_[i].sequence = i;
79 virtual ~
MpmcQueue() {
delete[] buffer_; }
81 bool write(
const T& in,
bool no_drop =
false) {
83 uint32_t pos = enqueue_pos_.load(std::memory_order_relaxed);
86 cell = &buffer_[pos & buffer_mask_];
87 uint32_t seq = cell->sequence.load(std::memory_order_acquire);
88 intptr_t dif = (intptr_t)seq - (intptr_t)pos;
90 if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
100 pos = enqueue_pos_.load(std::memory_order_relaxed);
104 cell->sequence.store(pos + 1, std::memory_order_release);
105 enqueue_pos_size_.store(enqueue_pos_, std::memory_order_relaxed);
109 uint32_t read(T* out)
const {
111 uint32_t pos = dequeue_pos_.load(std::memory_order_relaxed);
113 cell = &buffer_[pos & buffer_mask_];
114 uint32_t seq = cell->sequence.load(std::memory_order_acquire);
115 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
117 if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
119 }
else if (dif < 0) {
122 pos = dequeue_pos_.load(std::memory_order_relaxed);
126 cell->sequence.store(pos + buffer_mask_ + 1, std::memory_order_release);
127 dequeue_pos_size_.store(dequeue_pos_, std::memory_order_relaxed);
133 uint32_t pos = dequeue_pos_.load(std::memory_order_relaxed);
135 cell = &buffer_[pos & buffer_mask_];
136 uint32_t seq = cell->sequence.load(std::memory_order_acquire);
137 intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
139 if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
141 }
else if (dif < 0) {
144 pos = dequeue_pos_.load(std::memory_order_relaxed);
147 cell->sequence.store(pos + buffer_mask_ + 1, std::memory_order_release);
148 dequeue_pos_size_.store(dequeue_pos_, std::memory_order_relaxed);
151 uint32_t pos()
const {
return dequeue_pos_size_.load(std::memory_order_relaxed); }
153 uint32_t size()
const {
154 return enqueue_pos_size_.load(std::memory_order_relaxed) - dequeue_pos_size_.load(std::memory_order_relaxed);
159 std::atomic<uint32_t> sequence;
164 uint32_t
const buffer_mask_;
166 std::atomic<uint32_t> enqueue_pos_{};
167 std::atomic<uint32_t> enqueue_pos_size_{};
169 mutable std::atomic<uint32_t> dequeue_pos_{};
170 mutable std::atomic<uint32_t> dequeue_pos_size_{};
177 #endif // UTILS_UTL_BUFFER_H