Added integration for event building in main envent builder file

dev
MasterRoby3 2023-09-04 01:30:07 +02:00
parent 583d6fdae0
commit c1bfb07196
9 changed files with 359 additions and 160 deletions

View File

@ -1,10 +1,17 @@
#include "ControlledQueue.h"
#include <chrono>
#include <cstdint>
#include <mutex>
#include "ControlledQueue.h"
#include "TimeoutException.h"
ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;}
void ControlledQueue::put(Fragment fragment) {
void ControlledQueue::init(uint32_t maxSize, int timeoutMicroseconds) {
m_maxSize = maxSize;
m_timeoutMicroseconds = timeoutMicroseconds;
}
void ControlledQueue::put(uint32_t word) {
std::unique_lock<std::mutex> lk(m_mtx);
/*
@ -17,7 +24,7 @@ void ControlledQueue::put(Fragment fragment) {
m_cv.wait(lk);
}
m_queue.push(fragment);
m_queue.push(word);
lk.unlock();
m_cv.notify_all();
@ -27,18 +34,18 @@ void ControlledQueue::put(Fragment fragment) {
Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element.
That way it can be catched from the main program that can set the error code accordingly.
*/
Fragment ControlledQueue::get() {
uint32_t ControlledQueue::get() {
std::unique_lock<std::mutex> lk(m_mtx);
if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) {
throw "Get Timeout";
throw TimeoutException();
}
Fragment fragment = m_queue.front();
uint32_t word = m_queue.front();
m_queue.pop();
lk.unlock();
m_cv.notify_all();
return fragment;
return word;
}

View File

@ -6,23 +6,29 @@
#include <condition_variable>
#include <queue>
#include "fragment_dataformat.h"
class ControlledQueue {
public:
/*
Default constructo added for default initialization in std::array. If used, it's necessary to call the init function afterwards.
*/
ControlledQueue() {;}
ControlledQueue(uint32_t maxSize, int timeoutMicroseconds);
void put(Fragment fragment);
Fragment get();
void init(uint32_t maxSize, int timeoutMicroseconds);
void put(uint32_t word);
uint32_t get();
// Simple wrapper to check queue size
int size() { return m_queue.size(); }
// Simple wrapper to see wether the queue is empty
bool empty() { return m_queue.empty(); }
private:
std::mutex m_mtx;
std::condition_variable m_cv;
uint32_t m_maxSize;
std::queue<Fragment> m_queue;
std::queue<uint32_t> m_queue;
int m_timeoutMicroseconds;
};

18
TimeoutException.h 100644
View File

@ -0,0 +1,18 @@
#include <stdexcept>
class TimeoutException
: public std::runtime_error
{
public:
TimeoutException() : std::runtime_error("Timeout") {;}
TimeoutException(const char* message)
: std::runtime_error(message) {
}
TimeoutException(const std::string& message)
: std::runtime_error(message) {
}
};

View File

@ -1,20 +1,49 @@
#include <arpa/inet.h>
#include <csignal>
#include "ControlledQueue.h"
#include <cerrno>
#include <cstddef>
#include <cinttypes>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/time.h>
#include <errno.h>
#include <cstdio> // for fprintf()
#include <functional>
#include <iostream>
#include <queue>
#include <vector>
#include <ratio>
#include <unistd.h> // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
#include <cstring> // for strncmp
//my addition to the online guide
#include <csignal>
#include <cstdlib>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <utility>
#include <vector>
#include <chrono>
#include <fstream>
#include <thread>
#include <mutex>
#include <atomic>
#include <array>
#include "fragment_dataformat.h"
#include "full_event_format.h"
#include "TimeoutException.h"
#define MAX_EVENTS 1024
#define MAX_QUEUE_SIZE 1000
#define MAX_TIMEOUT_MICROSEC 500000
#define READER_THREADS 6
// That's a buffer size of 64 kB, to maximize performance without it being too big, according to testing
#define BUFFER_SIZE_WORDS 16384
int min_fd = MAX_EVENTS + 1;
int max_fd = 0;
int makeSocket() {
int sockfd;
@ -39,7 +68,7 @@ void bindSocketPort(int server_fd, int port) {
}
void startListening(int server_fd) {
if (listen(server_fd, 3) < 0) {
if (listen(server_fd, 20000) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
@ -54,72 +83,221 @@ int acceptConnection(int server_fd) {
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
} else {
int flags = fcntl(client_fd, F_GETFL);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
return client_fd;
}
void acceptConnectionEpollStyle(int server_fd, int &efd) {
struct sockaddr_in new_remoteAddr;
int addrlen = sizeof(struct sockaddr_in);
while (true) {
int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen);
if (conn_sock == -1) {
// All incoming connections have been processed
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
} else {
perror("accept");
break;
}
}
// make new connection non-blocking
int flags = fcntl(conn_sock, F_GETFL, 0);
fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK);
// monitor new connection for read events, always in edge triggered
struct epoll_event event;
event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
event.data.fd = conn_sock;
// Allow epoll to monitor the new connection
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) {
perror("epoll_ctl: conn_sock");
break;
}
printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock);
if (max_fd < conn_sock) max_fd = conn_sock;
if (min_fd > conn_sock) min_fd = conn_sock;
}
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
/*
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
std::array<std::mutex, MAX_EVENTS> queues_mutexes;
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int server_fd = makeSocket();
bindSocketPort(server_fd, port);
startListening(server_fd);
int client_fd = acceptConnection(server_fd);
void thradizable(int &epoll_fd, int &master_socket, std::array<ControlledQueue, MAX_EVENTS>& queues_array, const int &th_flag, const int thread_index) {
epoll_event events[MAX_EVENTS];
while (true) {
uint32_t word;
ssize_t bytes = read(client_fd, &word, 4);
if (bytes != 4) {
perror("Receive failed");
if (th_flag == 1) break;
// Time measurements
///auto start = std::chrono::high_resolution_clock::now();
// Returns only the sockets for which there are events
//printf("Before wait\n");
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
//printf("After wait\n");
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
printf("[RICEVUTO]\t0x%x\n", word);
// Iterate on the sockets having events
for (int i = 0; i < nfds; i++) {
//printf("Tot fds = %d reading from %d\n", nfds, i);
int fd = events[i].data.fd;
if (fd == master_socket) {
// If the activity is on the master socket, than it's a new connection request
acceptConnectionEpollStyle(master_socket, epoll_fd);
} else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) {
// Than the client connection is closed, so I close it
printf("Closing %d", fd);
close(fd);
} else {
//printf("Ev trig th %d with tot b %lu\n", thread_index, part);
// Than we received data from one of the monitored sockets
uint32_t buffer[BUFFER_SIZE_WORDS];
int valread = 0;
//while (valread != EAGAIN) {
std::unique_lock<std::mutex> lk(queues_mutexes[fd]);
valread = recv(fd, reinterpret_cast<char*>(buffer), sizeof(buffer), 0);
if (valread > 0) {
//printf("[RICEVUTO]\t FROM %d\n", fd);
int opt_incr = (valread % 4) ? 1 : 0;
for (int q_i = 0; q_i < (valread/4) + opt_incr; q_i++) {
queues_array[fd].put(buffer[q_i]);
}
/*
bytes_read += valread;
int kilos = 0;
if ((kilos = bytes_read / 1024) > 0) {
kBytes_read += kilos;
bytes_read -= (kilos * 1024);
//printf("reade bites %lu", bytes_read);
}*/
}
lk.unlock();
//}
}
}
///auto end = std::chrono::high_resolution_clock::now();
///double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
///
/*time_taken *= 1e-6;
total_time_taken += time_taken;
if (total_time_taken > 3e4) {
times.push_back(total_time_taken);
tot_received_data.push_back(kBytes_read);
break;
}*/
///
}
}
return 0;
}*/
#define TRUE 1
#define FALSE 0
void builder_thread (std::array<ControlledQueue, MAX_EVENTS> &queues_array, uint32_t &runNumber) {
uint32_t counter = 0;
while (1) {
FullEvent fullEvent;
fullEvent.headerSize = 5;
fullEvent.runNumber = runNumber;
fullEvent.eventNumber = counter;
fullEvent.fragmentsArray = new Fragment[max_fd - min_fd + 1];
for (int i = min_fd; i <= max_fd; i++){
std::unique_lock<std::mutex> lk(queues_mutexes[i]);
int inCurrentEventNumber = 0;
while (inCurrentEventNumber != 1) {
try {
uint32_t starter = queues_array[i].get();
if (starter == FRAGMENT_HEADER_MARKER) {
uint32_t headerSize = queues_array[i].get();
uint32_t fragmentSize = queues_array[i].get();
uint32_t* buffer = new uint32_t[fragmentSize];
buffer[0] = starter;
buffer[1] = headerSize;
buffer[2] = fragmentSize;
for (int j = 3; j < fragmentSize; j++) {
buffer[j] = queues_array[i].get();
}
Fragment fragment = decode_fragment(buffer);
if (fragment.header.detectorEventNumber < fullEvent.eventNumber) {
continue;
} else if (fragment.header.detectorEventNumber == fullEvent.eventNumber) {
inCurrentEventNumber = 1;
fullEvent.fragmentsArray[i - min_fd] = fragment;
} else {
printf("È successo un cazzo di casino");
}
}
} catch (const TimeoutException& ex) {
inCurrentEventNumber = 1;
Fragment fragment;
Header header;
header.sourceIdentifier = i - min_fd;
header.runNumber = runNumber;
header.detectorEventNumber = counter;
header.numberOfStatusElements = 1;
uint32_t firstStatusElement = 0x0;
firstStatusElement |= TIMEOUT_ERROR;
header.statusElementsArray = new uint32_t[header.numberOfStatusElements];
header.statusElementsArray[0] = firstStatusElement;
header.headerSize = 7 + header.numberOfStatusElements;
header.fragmentSize = header.headerSize;
fragment.header = header;
fullEvent.fragmentsArray[i - min_fd] = fragment;
}
}
lk.unlock();
}
}
}
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int master_socket;
const int opt = 1;
int opt = TRUE;
int master_socket , addrlen , new_socket , client_socket[30] ,
max_clients = 30 , activity, i , valread , sd;
int max_sd;
//set of socket descriptors
fd_set readfds;
//initialise all client_socket[] to 0 so not checked
for (i = 0; i < max_clients; i++)
{
client_socket[i] = 0;
}
master_socket = makeSocket();
@ -135,110 +313,53 @@ int main(int argc, char const *argv[]) {
bindSocketPort(master_socket, port);
startListening(master_socket);
while (true) {
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(master_socket, &readfds);
max_sd = master_socket;
int flags = fcntl(master_socket, F_GETFL, 0);
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
//add child sockets to set
for ( i = 0 ; i < max_clients ; i++)
{
//socket descriptor
sd = client_socket[i];
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
}
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
//then its an incoming connection
if (FD_ISSET(master_socket, &readfds))
{
new_socket = acceptConnection(master_socket);
//add new socket to array of sockets
for (i = 0; i < max_clients; i++)
{
//if position is empty
if( client_socket[i] == 0 )
{
client_socket[i] = new_socket;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
//else its some IO operation on some other socket
for (i = 0; i < max_clients; i++)
{
sd = client_socket[i];
if (FD_ISSET( sd , &readfds))
{
//Check if it was for closing , and also read the
//incoming message
uint32_t word;
if ((valread = recv( sd , &word, 4, 0)) == 0)
{
struct sockaddr_in address;
int addrlen;
//Somebody disconnected , get his details and print
getpeername(sd , (struct sockaddr*)&address , \
(socklen_t*)&addrlen);
printf("Host disconnected , ip %s , port %d \n" ,
inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
printf("Disconnected fd %d", sd);
//Close the socket and mark as 0 in list for reuse
close( sd );
client_socket[i] = 0;
}
//Echo back the message that came in
else
{
printf("[RICEVUTO]\t0x%x FROM %d\n", word, sd);
}
}
}
epoll_event ev, events[MAX_EVENTS];
std::array<uint64_t, MAX_EVENTS> kBytes_read_on_descr;
//The atomic here is used as a flag to tell the thread to stop
std::vector<std::thread> vThreads;
//create the epoll instance
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
printf("Failed to create epoll file descriptor\n");
exit(EXIT_FAILURE);
}
/*
ev.data.fd = master_socket;
// Reading events with edge triggered mode
ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
int client_fd = acceptConnection(server_fd);
// Allowing epoll to monitor the master_socket
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
std::array<std::thread, 2> vT;
std::array<int, 2> thread_flags;
while (true) {
uint32_t word;
ssize_t bytes = read(client_fd, &word, 4);
if (bytes != 4) {
perror("Receive failed");
exit(EXIT_FAILURE);
}
printf("[RICEVUTO]\t0x%x\n", word);
}*/
// Creating the data structure and initialization with max size and timeout ez win
std::array<ControlledQueue, MAX_EVENTS> queues_array;
for (auto& queue : queues_array) {
queue.init(MAX_QUEUE_SIZE, MAX_TIMEOUT_MICROSEC);
}
for (int t_i = 0; t_i < READER_THREADS; t_i++) {
thread_flags[t_i] = 0;
vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), std::ref(queues_array), std::cref(thread_flags.at(t_i)), t_i);
}
//ADD CONSUMER THREAD
if (close(epoll_fd)) {
printf("Failed to close epoll file descriptor");
exit(EXIT_FAILURE);
}
return 0;
}

View File

@ -10,6 +10,7 @@ typedef struct FullEvent {
uint32_t headerSize;
uint32_t eventSize;
uint32_t runNumber;
uint32_t eventNumber;
Fragment *fragmentsArray;

1
gcc-13 120000
View File

@ -0,0 +1 @@
/home/master-roby3/newgcc/gcc/bin/gcc

Binary file not shown.

View File

@ -0,0 +1,20 @@
#include <ostream>
#include <queue>
#include <array>
#include <iostream>
int main() {
std::queue<int> queue;
std::array<int, 4> range = {1,2,3,4};
queue.push_range(range);
while (!queue.empty()) {
std::cout << queue.front() << std::endl;
queue.pop();
}
return 0;
}

View File

@ -0,0 +1,25 @@
#include <ostream>
#include <queue>
#include <array>
#include <iostream>
int main() {
std::array<std::queue<int>, 3> arr_queues;
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 3; i++) {
arr_queues[i].push(i+j+10);
}
}
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 3; i++) {
std::cout << "Queue numebr " << i << " " << arr_queues[i].front() << std::endl;
arr_queues[i].pop();
}
}
return 0;
}