Anatomy of a Bounded MPMC Queue
Based on Bounded MPMC queue, the implementation is fine-tuned and explained below.
#include <cstddef> // for std::size_t
#include <cstdint> // for std::intptr_t
#include <atomic> // for std::atomic and std::memory_order_*
#include <cassert> // for assert
template<typename T>
class mpmc_bounded_queue {
public:
explicit mpmc_bounded_queue(const size_t buffer_size)
: buffer_(new cell_t [buffer_size]), buffer_mask_(buffer_size - 1) {
assert(buffer_size >= 2 && (buffer_size & buffer_size - 1) == 0); // buffer_size must be a power of 2 and at least 2
for (size_t i = 0; i != buffer_size; i += 1)
buffer_[i].sequence_.store(i, std::memory_order_relaxed); // initialize the sequence numbers
enqueue_pos_.store(0, std::memory_order_relaxed); // initialize the enqueue position
dequeue_pos_.store(0, std::memory_order_relaxed); // initialize the dequeue position
}
mpmc_bounded_queue(mpmc_bounded_queue const &) = delete; // non-copyable
void operator =(mpmc_bounded_queue const &) = delete; // non-assignable
~mpmc_bounded_queue() {
delete [] buffer_; // free the allocated buffer
}
bool enqueue(T const &data) {
cell_t *cell; // pointer to the cell to enqueue into
size_t pos = enqueue_pos_.load(std::memory_order_relaxed); // load the current enqueue position
for (;;) {
cell = &buffer_[pos & buffer_mask_]; // get the cell at the current position in circular buffer
const size_t seq = cell->sequence_.load(std::memory_order_acquire); // load the sequence number of the cell
if (const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos); dif == 0) { // cell is ready for enqueue
/**
* https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
* The first is a reference to the expected value, and the second is the desired.
*
* If the current value of the atomic is equal to the expected input,
* it replaces the current value with the desired value and returns true.
* And If the current value of the atomic is not equal to the expected input,
* it will return false and the expected input is set to the current value.
*/
if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) // try to claim this position
break; // successfully claimed the position
// else, another producer beat us to it or the data was not synchronized as memory_order_relaxed was used, update pos = pos + 1, and retry with the updated pos
} else if (dif < 0) // the cell is not ready for enqueueing as it has not been dequeued yet
return false;
else // another producer has moved the enqueue_pos_ forward, update pos and retry
pos = enqueue_pos_.load(std::memory_order_relaxed); // update pos and retry
}
cell->data_ = data; // store the data into the cell
cell->sequence_.store(pos + 1, std::memory_order_release); // update the sequence number to indicate the cell is ready for dequeue
return true;
}
bool dequeue(T &data) {
cell_t *cell; // pointer to the cell to dequeue from
size_t pos = dequeue_pos_.load(std::memory_order_relaxed); // load the current dequeue position
for (;;) {
cell = &buffer_[pos & buffer_mask_]; // get the cell at the current position in circular buffer
const size_t seq = cell->sequence_.load(std::memory_order_acquire); // load the sequence number of the cell
if (const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1); dif == 0) { // cell is ready for dequeue
if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) // try to claim this position
break; // successfully claimed the position
// else, another consumer beat us to it or the data was not synchronized as memory_order_relaxed was used, update pos = pos + 1, and retry with the updated pos
} else if (dif < 0) // the cell is not ready for dequeue as it has not been enqueued yet
return false;
else // another consumer has moved the dequeue_pos_ forward, update pos and retry
pos = dequeue_pos_.load(std::memory_order_relaxed); // update pos and retry
}
data = cell->data_; // retrieve the data from the cell
cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release); // update the sequence number to indicate the cell is ready for enqueue
return true;
}
private:
struct cell_t {
std::atomic<size_t> sequence_; // atomic sequence number
T data_; // data stored in the cell
};
static constexpr size_t cacheline_size = 64; // assuming a cache line size of 64 bytes
typedef char cacheline_pad_t[cacheline_size]; // type for cache line padding
cacheline_pad_t pad0_{}; // with {} to value-initializes the array, zeroing all elements
cell_t *const buffer_; // 4 bytes
size_t const buffer_mask_; // 4 bytes, a cacheline is 64 bytes, so buffer_ and buffer_mask_ fit in one cacheline
cacheline_pad_t pad1_{}; // padding to avoid false sharing
std::atomic<size_t> enqueue_pos_{}; // atomic enqueue position
cacheline_pad_t pad2_{}; // padding to avoid false sharing
std::atomic<size_t> dequeue_pos_{}; // atomic dequeue position
cacheline_pad_t pad3_{}; // padding to avoid false sharing
};For Java implementation, check JCTools MpmcArrayQueue.java
Last updated
Was this helpful?