Anatomy of a Bounded MPMC Queue

Based on Bounded MPMC queue, the implementation is fine-tuned and explained below.

#include <cstddef>   // for std::size_t
#include <cstdint>   // for std::intptr_t
#include <atomic>    // for std::atomic and std::memory_order_*
#include <cassert>   // for assert

template<typename T>
class mpmc_bounded_queue {
public:
    explicit mpmc_bounded_queue(const size_t buffer_size)
        : buffer_(new cell_t [buffer_size]), buffer_mask_(buffer_size - 1) {
        assert(buffer_size >= 2 && (buffer_size & buffer_size - 1) == 0); // buffer_size must be a power of 2 and at least 2
        for (size_t i = 0; i != buffer_size; i += 1)
            buffer_[i].sequence_.store(i, std::memory_order_relaxed); // initialize the sequence numbers
        enqueue_pos_.store(0, std::memory_order_relaxed); // initialize the enqueue position
        dequeue_pos_.store(0, std::memory_order_relaxed); // initialize the dequeue position
    }

    mpmc_bounded_queue(mpmc_bounded_queue const &) = delete; // non-copyable

    void operator =(mpmc_bounded_queue const &) = delete; // non-assignable

    ~mpmc_bounded_queue() {
        delete [] buffer_; // free the allocated buffer
    }

    bool enqueue(T const &data) {
        cell_t *cell; // pointer to the cell to enqueue into
        size_t pos = enqueue_pos_.load(std::memory_order_relaxed); // load the current enqueue position

        for (;;) {
            cell = &buffer_[pos & buffer_mask_]; // get the cell at the current position in circular buffer
            const size_t seq = cell->sequence_.load(std::memory_order_acquire); // load the sequence number of the cell

            if (const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos); dif == 0) { // cell is ready for enqueue
                /**
                 * https://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange
                 * The first is a reference to the expected value, and the second is the desired.
                 *
                 * If the current value of the atomic is equal to the expected input,
                 * it replaces the current value with the desired value and returns true.
                 * And If the current value of the atomic is not equal to the expected input,
                 * it will return false and the expected input is set to the current value.
                 */
                if (enqueue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) // try to claim this position
                    break; // successfully claimed the position
                // else, another producer beat us to it or the data was not synchronized as memory_order_relaxed was used, update pos = pos + 1, and retry with the updated pos
            } else if (dif < 0) // the cell is not ready for enqueueing as it has not been dequeued yet
                return false;
            else // another producer has moved the enqueue_pos_ forward, update pos and retry
                pos = enqueue_pos_.load(std::memory_order_relaxed); // update pos and retry
        }

        cell->data_ = data; // store the data into the cell
        cell->sequence_.store(pos + 1, std::memory_order_release); // update the sequence number to indicate the cell is ready for dequeue
        return true;
    }

    bool dequeue(T &data) {
        cell_t *cell; // pointer to the cell to dequeue from
        size_t pos = dequeue_pos_.load(std::memory_order_relaxed); // load the current dequeue position

        for (;;) {
            cell = &buffer_[pos & buffer_mask_]; // get the cell at the current position in circular buffer
            const size_t seq = cell->sequence_.load(std::memory_order_acquire); // load the sequence number of the cell
            if (const intptr_t dif = static_cast<intptr_t>(seq) - static_cast<intptr_t>(pos + 1); dif == 0) { // cell is ready for dequeue
                if (dequeue_pos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) // try to claim this position
                    break; // successfully claimed the position
                // else, another consumer beat us to it or the data was not synchronized as memory_order_relaxed was used, update pos = pos + 1, and retry with the updated pos
            } else if (dif < 0) // the cell is not ready for dequeue as it has not been enqueued yet
                return false; 
            else // another consumer has moved the dequeue_pos_ forward, update pos and retry
                pos = dequeue_pos_.load(std::memory_order_relaxed); // update pos and retry
        }

        data = cell->data_; // retrieve the data from the cell
        cell->sequence_.store(pos + buffer_mask_ + 1, std::memory_order_release); // update the sequence number to indicate the cell is ready for enqueue
        return true;
    }

private:
    struct cell_t {
        std::atomic<size_t> sequence_; // atomic sequence number
        T data_; // data stored in the cell
    };

    static constexpr size_t cacheline_size = 64; // assuming a cache line size of 64 bytes
    typedef char cacheline_pad_t[cacheline_size]; // type for cache line padding

    cacheline_pad_t pad0_{}; // with {} to value-initializes the array, zeroing all elements
    cell_t *const buffer_; // 4 bytes
    size_t const buffer_mask_; // 4 bytes, a cacheline is 64 bytes, so buffer_ and buffer_mask_ fit in one cacheline
    cacheline_pad_t pad1_{}; // padding to avoid false sharing
    std::atomic<size_t> enqueue_pos_{}; // atomic enqueue position
    cacheline_pad_t pad2_{}; // padding to avoid false sharing
    std::atomic<size_t> dequeue_pos_{}; // atomic dequeue position
    cacheline_pad_t pad3_{}; // padding to avoid false sharing
};

For Java implementation, check JCTools MpmcArrayQueue.java

Last updated

Was this helpful?