44 lines
1.4 KiB
C++
44 lines
1.4 KiB
C++
|
#include "ControlledQueue.h"
|
||
|
#include <chrono>
|
||
|
#include <mutex>
|
||
|
|
||
|
ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;}
|
||
|
|
||
|
void ControlledQueue::put(Fragment fragment) {
|
||
|
std::unique_lock<std::mutex> lk(m_mtx);
|
||
|
|
||
|
/*
|
||
|
This while synthax is explicitly used to avoid unwanted spurious awakenings.
|
||
|
I'm keeping only the wait with not timeout to transmit backpressure to the upper reading thread, which blocks.
|
||
|
Since that thread has locked the mutex corresponding to the fd, basically the whole fd is blocked.
|
||
|
That way I can tranmit backpressure with TCP to the sending client.
|
||
|
*/
|
||
|
while (! (m_queue.size() < m_maxSize) ) {
|
||
|
m_cv.wait(lk);
|
||
|
}
|
||
|
|
||
|
m_queue.push(fragment);
|
||
|
|
||
|
lk.unlock();
|
||
|
m_cv.notify_all();
|
||
|
}
|
||
|
|
||
|
/*
|
||
|
Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element.
|
||
|
That way it can be catched from the main program that can set the error code accordingly.
|
||
|
*/
|
||
|
Fragment ControlledQueue::get() {
|
||
|
std::unique_lock<std::mutex> lk(m_mtx);
|
||
|
|
||
|
if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) {
|
||
|
throw "Get Timeout";
|
||
|
}
|
||
|
|
||
|
Fragment fragment = m_queue.front();
|
||
|
m_queue.pop();
|
||
|
|
||
|
lk.unlock();
|
||
|
m_cv.notify_all();
|
||
|
|
||
|
return fragment;
|
||
|
}
|