Added a ControlledQueue class for max size implementation and timeout management

dev
MasterRoby3 2023-08-28 01:35:30 +02:00
parent ac02149c6d
commit f34e7e0dc5
2 changed files with 72 additions and 0 deletions

View File

@ -0,0 +1,44 @@
#include "ControlledQueue.h"
#include <chrono>
#include <mutex>
ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;}
void ControlledQueue::put(Fragment fragment) {
std::unique_lock<std::mutex> lk(m_mtx);
/*
This while synthax is explicitly used to avoid unwanted spurious awakenings.
I'm keeping only the wait with not timeout to transmit backpressure to the upper reading thread, which blocks.
Since that thread has locked the mutex corresponding to the fd, basically the whole fd is blocked.
That way I can tranmit backpressure with TCP to the sending client.
*/
while (! (m_queue.size() < m_maxSize) ) {
m_cv.wait(lk);
}
m_queue.push(fragment);
lk.unlock();
m_cv.notify_all();
}
/*
Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element.
That way it can be catched from the main program that can set the error code accordingly.
*/
Fragment ControlledQueue::get() {
std::unique_lock<std::mutex> lk(m_mtx);
if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) {
throw "Get Timeout";
}
Fragment fragment = m_queue.front();
m_queue.pop();
lk.unlock();
m_cv.notify_all();
return fragment;
}

28
ControlledQueue.h 100644
View File

@ -0,0 +1,28 @@
#pragma once
#include <cstdint>
#include <mutex>
#include <condition_variable>
#include <queue>
#include "fragment_dataformat.h"
class ControlledQueue {
public:
ControlledQueue(uint32_t maxSize, int timeoutMicroseconds);
void put(Fragment fragment);
Fragment get();
bool empty() { return m_queue.empty(); }
private:
std::mutex m_mtx;
std::condition_variable m_cv;
uint32_t m_maxSize;
std::queue<Fragment> m_queue;
int m_timeoutMicroseconds;
};