LFQ/include/queue.hh

115 lines
2.9 KiB
C++

#pragma once
#include <array>
#include <atomic>
#include <cassert>
#include <format>
#include <iostream>
#include <limits>
#include <thread>
namespace lfq {
template <typename T, size_t Size = 256>
class LockFreeQueue {
static_assert(!(Size & (Size - 1)), "Size must be power of two");
struct node {
node() = default;
node(size_t pos, T obj) : pos(pos), obj(obj) {}
node(node& oth) : pos(oth.pos.load()), obj(oth.obj) {}
node(node&& oth) : pos(oth.pos.load()), obj(std::move(oth.obj)) {}
node& operator=(node& oth) {
pos = oth.pos.load();
obj = oth.obj;
return *this;
}
node& operator=(node&& oth) {
pos = oth.pos.load();
obj = std::move(oth.obj);
return *this;
}
std::atomic<size_t> pos{std::numeric_limits<size_t>::max()};
T obj;
};
public:
LockFreeQueue() : enqueue_pos(0), dequeue_pos(0) {}
void push_back(const T& element);
void pull_front(T& element);
size_t apprx_available() const;
bool is_full() const;
bool is_empty() const;
private:
static constexpr size_t mask{Size - 1};
std::atomic<size_t> enqueue_pos;
std::atomic<size_t> dequeue_pos;
std::array<node, Size> elements;
};
/**
* @brief lfq implementation of pull operation.
* result will be written to reference argument
* it will block the thread until success
*/
template <typename T, size_t Size>
void LockFreeQueue<T, Size>::pull_front(T& element) {
node* n_obj;
size_t fix_pos{0};
bool res{false};
while (!res) {
while (is_empty()) {
std::this_thread::yield();
}
fix_pos = dequeue_pos.load();
n_obj = &elements[fix_pos & mask];
if (n_obj->pos != fix_pos) {
// someone installed the value already retry
std::this_thread::yield();
continue;
}
res = dequeue_pos.compare_exchange_weak(fix_pos, fix_pos + 1);
}
element = n_obj->obj;
n_obj->pos = std::numeric_limits<size_t>::max();
}
/**
* @brief lock free implementation for push back operation
* @details trying my best to make it work :D
*/
template <typename T, size_t Size>
void LockFreeQueue<T, Size>::push_back(const T& element) {
size_t fix_pos{0};
bool res{false};
node obj;
while (!res) {
while (is_full()) {
std::this_thread::yield();
}
fix_pos = enqueue_pos.load();
obj = elements[fix_pos & mask];
res = enqueue_pos.compare_exchange_weak(fix_pos, fix_pos + 1);
}
elements[fix_pos & mask] = node{fix_pos, element};
}
template <typename T, size_t Size>
bool LockFreeQueue<T, Size>::is_full() const {
assert(enqueue_pos.load() >= dequeue_pos.load());
return enqueue_pos.load() - dequeue_pos.load() >= Size;
}
template <typename T, size_t Size>
bool LockFreeQueue<T, Size>::is_empty() const {
return enqueue_pos.load() == dequeue_pos.load();
}
template <typename T, size_t Size>
size_t LockFreeQueue<T, Size>::apprx_available() const {
assert(enqueue_pos.load() >= dequeue_pos.load());
return enqueue_pos.load() - dequeue_pos.load();
}
} // namespace lfq