diff --git a/UnitTests/UnitTests.vcxproj b/UnitTests/UnitTests.vcxproj index cdc137988..2f6f6bcd6 100644 --- a/UnitTests/UnitTests.vcxproj +++ b/UnitTests/UnitTests.vcxproj @@ -121,6 +121,7 @@ Create + diff --git a/UnitTests/UnitTests.vcxproj.filters b/UnitTests/UnitTests.vcxproj.filters index a1d5947e6..29bb1fd0d 100644 --- a/UnitTests/UnitTests.vcxproj.filters +++ b/UnitTests/UnitTests.vcxproj.filters @@ -30,6 +30,9 @@ Source Files + + Source Files + diff --git a/UnitTests/pch.h b/UnitTests/pch.h index b7d7f5037..30152cedc 100644 --- a/UnitTests/pch.h +++ b/UnitTests/pch.h @@ -19,6 +19,7 @@ #include #include #include +#include #include #include #include @@ -27,3 +28,4 @@ #include #include +#include diff --git a/UnitTests/ring.cpp b/UnitTests/ring.cpp new file mode 100644 index 000000000..cfd1c94b6 --- /dev/null +++ b/UnitTests/ring.cpp @@ -0,0 +1,55 @@ +/* + SPDX-License-Identifier: MIT + Copyright © 2023 Amebis +*/ + +#include "pch.h" + +using namespace std; +using namespace Microsoft::VisualStudio::CppUnitTestFramework; + +namespace UnitTests +{ + constexpr size_t capacity = 50; + + TEST_CLASS(ring) + { + public: + TEST_METHOD(test) + { + using ring_t = stdex::ring; + ring_t ring; + thread writer([](_Inout_ ring_t& ring) + { + int seed = 0; + for (size_t retries = 1000; retries--;) { + for (auto to_write = static_cast(static_cast(::rand()) * capacity / 5 / RAND_MAX); to_write;) { + int* ptr; size_t num_write; + tie(ptr, num_write) = ring.back(); + if (to_write < num_write) + num_write = to_write; + for (size_t i = 0; i < num_write; i++) + ptr[i] = seed++; + ring.push(num_write); + to_write -= num_write; + } + } + ring.quit(); + }, ref(ring)); + + int seed = 0; + for (;;) { + int* ptr; size_t num_read; + tie(ptr, num_read) = ring.front(); + if (!ptr) _Unlikely_ + break; + if (num_read > 7) + num_read = 7; + for (size_t i = 0; i < num_read; ++i) + Assert::AreEqual(seed++, ptr[i]); + ring.pop(num_read); + } + writer.join(); + } + }; +} diff --git a/include/stdex/ring.hpp b/include/stdex/ring.hpp new file mode 100644 index 000000000..f6a7e8132 --- /dev/null +++ b/include/stdex/ring.hpp @@ -0,0 +1,150 @@ +/* + SPDX-License-Identifier: MIT + Copyright © 2023 Amebis +*/ + +#pragma once + +#include "sal.hpp" +#include +#include +#include +#include + +namespace stdex +{ + /// + /// Ring buffer + /// + /// \tparam T Ring element type + /// \tparam CAPACITY Ring capacity (in number of elements) + /// + template + class ring + { + public: +#pragma warning(suppress:26495) // Don't bother to initialize m_data + ring() : + m_head(0), + m_size(0), + m_quit(false) + {} + + /// + /// Allocates the data after the ring tail. Use push() after the allocated data is populated. + /// + /// \return Pointer to data available for writing and maximum data size to write. Or, `{nullptr, 0}` if quit() has been called. + /// + std::tuple back() + { + std::unique_lock lk(m_mutex); + if (!space()) { + m_head_moved.wait(lk, [&]{return m_quit || space();}); + if (m_quit) _Unlikely_ + return { nullptr, 0 }; + } + size_t tail = wrap(m_head + m_size); + return { &m_data[tail], m_head <= tail ? CAPACITY - tail : m_head - tail }; + } + + /// + /// Notifies the receiver the data was populated. + /// + /// \param[in] size Amount of data that was really populated + /// + void push(_In_ size_t size) + { + { + const std::lock_guard lg(m_mutex); +#ifdef _DEBUG + size_t tail = wrap(m_head + m_size); + assert(size <= (m_head <= tail ? CAPACITY - tail : m_head - tail)); +#endif + m_size += size; + } + m_tail_moved.notify_one(); + } + + /// + /// Peeks the data at the ring head. Use pop() after the data was consumed. + /// + /// \return Pointer to data available for reading and maximum data size to read. Or, `{nullptr, 0}` if quit() has been called. + /// + std::tuple front() + { + std::unique_lock lk(m_mutex); + if (empty()) { + m_tail_moved.wait(lk, [&]{return m_quit || !empty();}); + if (m_quit && empty()) _Unlikely_ + return { nullptr, 0 }; + } + size_t tail = wrap(m_head + m_size); + return { &m_data[m_head], m_head < tail ? m_size : CAPACITY - m_head }; + } + + /// + /// Notifies the sender the data was consumed. + /// + /// \param[in] size Amount of data that was really consumed + /// + void pop(_In_ size_t size) + { + { + const std::lock_guard lg(m_mutex); +#ifdef _DEBUG + size_t tail = wrap(m_head + m_size); + assert(size <= (m_head < tail ? m_size : CAPACITY - m_head)); +#endif + m_head = wrap(m_head + size); + m_size -= size; + } + m_head_moved.notify_one(); + } + + /// + /// Cancells waiting sender and receiver + /// + void quit() + { + { + const std::lock_guard lg(m_mutex); + m_quit = true; + } + m_head_moved.notify_one(); + m_tail_moved.notify_one(); + } + + /// + /// Waits until the ring is flush + /// + void sync() + { + std::unique_lock lk(m_mutex); + m_head_moved.wait(lk, [&]{return m_quit || empty();}); + } + + protected: + inline size_t wrap(_In_ size_t idx) const + { + // TODO: When CAPACITY is power of 2, use & ~(CAPACITY - 1) instead. + return idx % CAPACITY; + } + + inline size_t space() const + { + return CAPACITY - m_size; + } + + inline bool empty() const + { + return !m_size; + } + + protected: + std::mutex m_mutex; + std::condition_variable m_head_moved, m_tail_moved; + size_t m_head, m_size; + bool m_quit; + T m_data[CAPACITY]; + }; +}