From f34e7e0dc56abbefb7453948da009289be30b721 Mon Sep 17 00:00:00 2001 From: MasterRoby3 Date: Mon, 28 Aug 2023 01:35:30 +0200 Subject: [PATCH] Added a ControlledQueue class for max size implementation and timeout management --- ControlledQueue.cxx | 44 ++++++++++++++++++++++++++++++++++++++++++++ ControlledQueue.h | 28 ++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 ControlledQueue.cxx create mode 100644 ControlledQueue.h diff --git a/ControlledQueue.cxx b/ControlledQueue.cxx new file mode 100644 index 0000000..3340333 --- /dev/null +++ b/ControlledQueue.cxx @@ -0,0 +1,44 @@ +#include "ControlledQueue.h" +#include +#include + +ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;} + +void ControlledQueue::put(Fragment fragment) { + std::unique_lock lk(m_mtx); + + /* + This while synthax is explicitly used to avoid unwanted spurious awakenings. + I'm keeping only the wait with not timeout to transmit backpressure to the upper reading thread, which blocks. + Since that thread has locked the mutex corresponding to the fd, basically the whole fd is blocked. + That way I can tranmit backpressure with TCP to the sending client. + */ + while (! (m_queue.size() < m_maxSize) ) { + m_cv.wait(lk); + } + + m_queue.push(fragment); + + lk.unlock(); + m_cv.notify_all(); +} + +/* + Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element. + That way it can be catched from the main program that can set the error code accordingly. +*/ +Fragment ControlledQueue::get() { + std::unique_lock lk(m_mtx); + + if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) { + throw "Get Timeout"; + } + + Fragment fragment = m_queue.front(); + m_queue.pop(); + + lk.unlock(); + m_cv.notify_all(); + + return fragment; +} \ No newline at end of file diff --git a/ControlledQueue.h b/ControlledQueue.h new file mode 100644 index 0000000..a3d1bc9 --- /dev/null +++ b/ControlledQueue.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include +#include +#include + +#include "fragment_dataformat.h" + + + +class ControlledQueue { + public: + ControlledQueue(uint32_t maxSize, int timeoutMicroseconds); + + void put(Fragment fragment); + Fragment get(); + bool empty() { return m_queue.empty(); } + + private: + std::mutex m_mtx; + std::condition_variable m_cv; + uint32_t m_maxSize; + std::queue m_queue; + int m_timeoutMicroseconds; + +}; \ No newline at end of file