Motorcortex Core  version: 2.7.6
utl_buffer.h
1 /*
2  * Developer : Alexey Zakharov (alexey.zakharov@vectioneer.com)
3  * All rights reserved. Copyright (c) 2015 VECTIONEER.
4  */
5 
6 #ifndef UTILS_UTL_BUFFER_H
7 #define UTILS_UTL_BUFFER_H
8 
9 #include "utl_log.h"
10 #include "utl_tsan.h"
11 #include <atomic>
12 #include <cstdint>
13 #include <cstdio>
14 #include <shared_mutex>
15 #include <unistd.h>
16 
17 namespace mcx {
18 
19 namespace utils {
20 
21 template <typename T>
22 class SpmcBuffer {
23  static constexpr uint32_t BUFFER_SIZE = 16;
24  static constexpr uint32_t BUFFER_MASK{BUFFER_SIZE - 1};
25 
26 public:
27  SpmcBuffer() : buffer_(new Cell[BUFFER_SIZE]()) {}
28 
29  virtual ~SpmcBuffer() { delete[] buffer_; }
30 
31  void write(const T& in) {
32  uint32_t pos = enqueue_pos_.load(std::memory_order_relaxed);
33  Cell* buf;
34  do {
35  pos += 1;
36  buf = &buffer_[pos & BUFFER_MASK];
37  } while (!buf->guard.try_lock());
38  buf->data = in;
39  buf->guard.unlock();
40  enqueue_pos_.store(pos, std::memory_order_release);
41  }
42 
43  constexpr uint32_t size() { return BUFFER_SIZE; }
44 
45  uint32_t read(T* out) const {
46  uint32_t pos = enqueue_pos_.load(std::memory_order_acquire);
47  Cell* buf = &buffer_[pos & BUFFER_MASK];
48  buf->guard.lock_shared();
49  *out = buf->data;
50  buf->guard.unlock_shared();
51  return pos;
52  }
53 
54  uint32_t pos() const { return enqueue_pos_.load(std::memory_order_acquire); }
55 
56 protected:
57  struct /*alignas(LEVEL1_DCACHE_LINESIZE)*/ Cell {
58 #if __cplusplus >= 201703L
59  std::shared_mutex guard;
60 #else
61  std::shared_timed_mutex guard;
62 #endif
63  T data;
64  };
65  Cell* const buffer_;
66  std::atomic<uint32_t> enqueue_pos_{};
67 };
68 
69 template <typename T>
70 class MpmcQueue {
71 public:
72  explicit MpmcQueue(uint32_t buffer_length) : buffer_(new Cell[buffer_length]{}), buffer_mask_(buffer_length - 1) {
73  log_assert((buffer_length >= 2) && ((buffer_length & (buffer_length - 1)) == 0),
74  "Queue length must be 2^N") for (uint32_t i = 0; i < buffer_length; ++i) {
75  buffer_[i].sequence = i;
76  }
77  }
78 
79  virtual ~MpmcQueue() { delete[] buffer_; }
80 
81  bool write(const T& in, bool no_drop = false) {
82  Cell* cell;
83  uint32_t pos = enqueue_pos_.load(std::memory_order_relaxed);
84  bool is_ok = true;
85  for (;;) {
86  cell = &buffer_[pos & buffer_mask_];
87  uint32_t seq = cell->sequence.load(std::memory_order_acquire);
88  intptr_t dif = (intptr_t)seq - (intptr_t)pos;
89  if (dif == 0) {
90  if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
91  break;
92  } else if (dif < 0) {
93  if (!no_drop) {
94  drop();
95  is_ok = false;
96  } else {
97  return false;
98  }
99  } else {
100  pos = enqueue_pos_.load(std::memory_order_relaxed);
101  }
102  }
103  cell->data = in;
104  cell->sequence.store(pos + 1, std::memory_order_release);
105  enqueue_pos_size_.store(enqueue_pos_, std::memory_order_relaxed);
106  return is_ok;
107  }
108 
109  uint32_t read(T* out) const {
110  Cell* cell;
111  uint32_t pos = dequeue_pos_.load(std::memory_order_relaxed);
112  for (;;) {
113  cell = &buffer_[pos & buffer_mask_];
114  uint32_t seq = cell->sequence.load(std::memory_order_acquire);
115  intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
116  if (dif == 0) {
117  if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
118  break;
119  } else if (dif < 0) {
120  return pos;
121  } else {
122  pos = dequeue_pos_.load(std::memory_order_relaxed);
123  }
124  }
125  *out = cell->data;
126  cell->sequence.store(pos + buffer_mask_ + 1, std::memory_order_release);
127  dequeue_pos_size_.store(dequeue_pos_, std::memory_order_relaxed);
128  return pos + 1;
129  }
130 
131  void drop() {
132  Cell* cell;
133  uint32_t pos = dequeue_pos_.load(std::memory_order_relaxed);
134  for (;;) {
135  cell = &buffer_[pos & buffer_mask_];
136  uint32_t seq = cell->sequence.load(std::memory_order_acquire);
137  intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
138  if (dif == 0) {
139  if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed))
140  break;
141  } else if (dif < 0) {
142  return;
143  } else {
144  pos = dequeue_pos_.load(std::memory_order_relaxed);
145  }
146  }
147  cell->sequence.store(pos + buffer_mask_ + 1, std::memory_order_release);
148  dequeue_pos_size_.store(dequeue_pos_, std::memory_order_relaxed);
149  }
150 
151  uint32_t pos() const { return dequeue_pos_size_.load(std::memory_order_relaxed); }
152 
153  uint32_t size() const {
154  return enqueue_pos_size_.load(std::memory_order_relaxed) - dequeue_pos_size_.load(std::memory_order_relaxed);
155  }
156 
157 protected:
158  struct Cell {
159  std::atomic<uint32_t> sequence;
160  T data;
161  };
162 
163  Cell* const buffer_;
164  uint32_t const buffer_mask_;
165 
166  std::atomic<uint32_t> enqueue_pos_{};
167  std::atomic<uint32_t> enqueue_pos_size_{};
168 
169  mutable std::atomic<uint32_t> dequeue_pos_{};
170  mutable std::atomic<uint32_t> dequeue_pos_size_{};
171 };
172 
173 } // namespace utils
174 
175 } // namespace mcx
176 
177 #endif // UTILS_UTL_BUFFER_H
mcx::utils::SpmcBuffer
Definition: utl_buffer.h:22
mcx::utils::SpmcBuffer::Cell
Definition: utl_buffer.h:57
mcx::utils::MpmcQueue
Definition: utl_buffer.h:70
mcx::utils::MpmcQueue::Cell
Definition: utl_buffer.h:158