SimpleEventBuilder/backup/ControlledQueue.cxx

44 lines
1.4 KiB
C++

#include "ControlledQueue.h"
#include <chrono>
#include <mutex>
ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;}
void ControlledQueue::put(Fragment fragment) {
std::unique_lock<std::mutex> lk(m_mtx);
/*
This while synthax is explicitly used to avoid unwanted spurious awakenings.
I'm keeping only the wait with not timeout to transmit backpressure to the upper reading thread, which blocks.
Since that thread has locked the mutex corresponding to the fd, basically the whole fd is blocked.
That way I can tranmit backpressure with TCP to the sending client.
*/
while (! (m_queue.size() < m_maxSize) ) {
m_cv.wait(lk);
}
m_queue.push(fragment);
lk.unlock();
m_cv.notify_all();
}
/*
Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element.
That way it can be catched from the main program that can set the error code accordingly.
*/
Fragment ControlledQueue::get() {
std::unique_lock<std::mutex> lk(m_mtx);
if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) {
throw "Get Timeout";
}
Fragment fragment = m_queue.front();
m_queue.pop();
lk.unlock();
m_cv.notify_all();
return fragment;
}