diff --git a/ControlledQueue.cxx b/ControlledQueue.cxx index 3340333..5a4fbf2 100644 --- a/ControlledQueue.cxx +++ b/ControlledQueue.cxx @@ -1,10 +1,17 @@ -#include "ControlledQueue.h" #include +#include #include +#include "ControlledQueue.h" +#include "TimeoutException.h" ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;} -void ControlledQueue::put(Fragment fragment) { +void ControlledQueue::init(uint32_t maxSize, int timeoutMicroseconds) { + m_maxSize = maxSize; + m_timeoutMicroseconds = timeoutMicroseconds; +} + +void ControlledQueue::put(uint32_t word) { std::unique_lock lk(m_mtx); /* @@ -17,7 +24,7 @@ void ControlledQueue::put(Fragment fragment) { m_cv.wait(lk); } - m_queue.push(fragment); + m_queue.push(word); lk.unlock(); m_cv.notify_all(); @@ -27,18 +34,18 @@ void ControlledQueue::put(Fragment fragment) { Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element. That way it can be catched from the main program that can set the error code accordingly. */ -Fragment ControlledQueue::get() { +uint32_t ControlledQueue::get() { std::unique_lock lk(m_mtx); if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) { - throw "Get Timeout"; + throw TimeoutException(); } - Fragment fragment = m_queue.front(); + uint32_t word = m_queue.front(); m_queue.pop(); lk.unlock(); m_cv.notify_all(); - return fragment; + return word; } \ No newline at end of file diff --git a/ControlledQueue.h b/ControlledQueue.h index a3d1bc9..d7bcaab 100644 --- a/ControlledQueue.h +++ b/ControlledQueue.h @@ -6,23 +6,29 @@ #include #include -#include "fragment_dataformat.h" - - class ControlledQueue { public: + /* + Default constructo added for default initialization in std::array. If used, it's necessary to call the init function afterwards. + */ + ControlledQueue() {;} ControlledQueue(uint32_t maxSize, int timeoutMicroseconds); - void put(Fragment fragment); - Fragment get(); + void init(uint32_t maxSize, int timeoutMicroseconds); + void put(uint32_t word); + uint32_t get(); + // Simple wrapper to check queue size + int size() { return m_queue.size(); } + + // Simple wrapper to see wether the queue is empty bool empty() { return m_queue.empty(); } private: std::mutex m_mtx; std::condition_variable m_cv; uint32_t m_maxSize; - std::queue m_queue; + std::queue m_queue; int m_timeoutMicroseconds; }; \ No newline at end of file diff --git a/TimeoutException.h b/TimeoutException.h new file mode 100644 index 0000000..f94d487 --- /dev/null +++ b/TimeoutException.h @@ -0,0 +1,18 @@ +#include + + +class TimeoutException + : public std::runtime_error +{ + public: + + TimeoutException() : std::runtime_error("Timeout") {;} + + TimeoutException(const char* message) + : std::runtime_error(message) { + } + + TimeoutException(const std::string& message) + : std::runtime_error(message) { + } +}; \ No newline at end of file diff --git a/event_builder.cxx b/event_builder.cxx index ea35e72..ede4dff 100644 --- a/event_builder.cxx +++ b/event_builder.cxx @@ -1,20 +1,49 @@ -#include -#include +#include "ControlledQueue.h" +#include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include - -#include -#include - +#include // for fprintf() +#include +#include #include -#include +#include +#include // for close(), read() +#include // for epoll_create1(), epoll_ctl(), struct epoll_event +#include // for strncmp +//my addition to the online guide +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "fragment_dataformat.h" +#include "full_event_format.h" +#include "TimeoutException.h" + +#define MAX_EVENTS 1024 +#define MAX_QUEUE_SIZE 1000 +#define MAX_TIMEOUT_MICROSEC 500000 + +#define READER_THREADS 6 +// That's a buffer size of 64 kB, to maximize performance without it being too big, according to testing +#define BUFFER_SIZE_WORDS 16384 + +int min_fd = MAX_EVENTS + 1; +int max_fd = 0; int makeSocket() { int sockfd; @@ -39,7 +68,7 @@ void bindSocketPort(int server_fd, int port) { } void startListening(int server_fd) { - if (listen(server_fd, 3) < 0) { + if (listen(server_fd, 20000) < 0) { perror("listen"); exit(EXIT_FAILURE); } @@ -54,72 +83,221 @@ int acceptConnection(int server_fd) { if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) { perror("accept"); exit(EXIT_FAILURE); + } else { + int flags = fcntl(client_fd, F_GETFL); + fcntl(client_fd, F_SETFL, flags | O_NONBLOCK); } printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd); return client_fd; } +void acceptConnectionEpollStyle(int server_fd, int &efd) { + struct sockaddr_in new_remoteAddr; + int addrlen = sizeof(struct sockaddr_in); + + while (true) { + int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen); + + if (conn_sock == -1) { + // All incoming connections have been processed + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + break; + } else { + perror("accept"); + break; + } + } + + // make new connection non-blocking + int flags = fcntl(conn_sock, F_GETFL, 0); + fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK); + + // monitor new connection for read events, always in edge triggered + struct epoll_event event; + event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET; + event.data.fd = conn_sock; + + // Allow epoll to monitor the new connection + if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) { + perror("epoll_ctl: conn_sock"); + break; + } + printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock); + if (max_fd < conn_sock) max_fd = conn_sock; + if (min_fd > conn_sock) min_fd = conn_sock; + + } +} + void term_handler(int signal) { printf("Terminated, received SIGNAL %d", signal); exit(EXIT_SUCCESS); } -/* -int main(int argc, char const *argv[]) { - signal(SIGTERM, term_handler); +std::array queues_mutexes; - if (argc != 2) { - printf("Usage: %s portNumber \n", argv[0]); - exit(EXIT_FAILURE); - } - int port = atoi(argv[1]); - printf("Start socket port %d\n", port); - - int server_fd = makeSocket(); - bindSocketPort(server_fd, port); - startListening(server_fd); - int client_fd = acceptConnection(server_fd); +void thradizable(int &epoll_fd, int &master_socket, std::array& queues_array, const int &th_flag, const int thread_index) { + epoll_event events[MAX_EVENTS]; while (true) { - uint32_t word; - ssize_t bytes = read(client_fd, &word, 4); - if (bytes != 4) { - perror("Receive failed"); + if (th_flag == 1) break; + // Time measurements + ///auto start = std::chrono::high_resolution_clock::now(); + + // Returns only the sockets for which there are events + //printf("Before wait\n"); + int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + //printf("After wait\n"); + if (nfds == -1) { + perror("epoll_wait"); exit(EXIT_FAILURE); } - printf("[RICEVUTO]\t0x%x\n", word); + + // Iterate on the sockets having events + for (int i = 0; i < nfds; i++) { + //printf("Tot fds = %d reading from %d\n", nfds, i); + int fd = events[i].data.fd; + if (fd == master_socket) { + // If the activity is on the master socket, than it's a new connection request + acceptConnectionEpollStyle(master_socket, epoll_fd); + + } else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { + // Than the client connection is closed, so I close it + printf("Closing %d", fd); + close(fd); + } else { + //printf("Ev trig th %d with tot b %lu\n", thread_index, part); + // Than we received data from one of the monitored sockets + uint32_t buffer[BUFFER_SIZE_WORDS]; + int valread = 0; + //while (valread != EAGAIN) { + std::unique_lock lk(queues_mutexes[fd]); + valread = recv(fd, reinterpret_cast(buffer), sizeof(buffer), 0); + if (valread > 0) { + //printf("[RICEVUTO]\t FROM %d\n", fd); + int opt_incr = (valread % 4) ? 1 : 0; + for (int q_i = 0; q_i < (valread/4) + opt_incr; q_i++) { + queues_array[fd].put(buffer[q_i]); + } + + /* + bytes_read += valread; + int kilos = 0; + if ((kilos = bytes_read / 1024) > 0) { + kBytes_read += kilos; + bytes_read -= (kilos * 1024); + //printf("reade bites %lu", bytes_read); + }*/ + } + lk.unlock(); + //} + + } + } + + ///auto end = std::chrono::high_resolution_clock::now(); + ///double time_taken = std::chrono::duration_cast(end - start).count(); + //time taken in milliseconds + /// + /*time_taken *= 1e-6; + total_time_taken += time_taken; + + if (total_time_taken > 3e4) { + times.push_back(total_time_taken); + tot_received_data.push_back(kBytes_read); + break; + }*/ + /// } +} - return 0; -}*/ -#define TRUE 1 -#define FALSE 0 +void builder_thread (std::array &queues_array, uint32_t &runNumber) { + uint32_t counter = 0; + while (1) { + FullEvent fullEvent; + fullEvent.headerSize = 5; + fullEvent.runNumber = runNumber; + fullEvent.eventNumber = counter; + + fullEvent.fragmentsArray = new Fragment[max_fd - min_fd + 1]; + + + for (int i = min_fd; i <= max_fd; i++){ + std::unique_lock lk(queues_mutexes[i]); + int inCurrentEventNumber = 0; + while (inCurrentEventNumber != 1) { + try { + uint32_t starter = queues_array[i].get(); + if (starter == FRAGMENT_HEADER_MARKER) { + uint32_t headerSize = queues_array[i].get(); + uint32_t fragmentSize = queues_array[i].get(); + uint32_t* buffer = new uint32_t[fragmentSize]; + buffer[0] = starter; + buffer[1] = headerSize; + buffer[2] = fragmentSize; + + for (int j = 3; j < fragmentSize; j++) { + buffer[j] = queues_array[i].get(); + } + + Fragment fragment = decode_fragment(buffer); + + if (fragment.header.detectorEventNumber < fullEvent.eventNumber) { + continue; + } else if (fragment.header.detectorEventNumber == fullEvent.eventNumber) { + inCurrentEventNumber = 1; + fullEvent.fragmentsArray[i - min_fd] = fragment; + + } else { + printf("È successo un cazzo di casino"); + } + } + } catch (const TimeoutException& ex) { + inCurrentEventNumber = 1; + Fragment fragment; + Header header; + header.sourceIdentifier = i - min_fd; + header.runNumber = runNumber; + header.detectorEventNumber = counter; + header.numberOfStatusElements = 1; + + uint32_t firstStatusElement = 0x0; + firstStatusElement |= TIMEOUT_ERROR; + + header.statusElementsArray = new uint32_t[header.numberOfStatusElements]; + header.statusElementsArray[0] = firstStatusElement; + + header.headerSize = 7 + header.numberOfStatusElements; + header.fragmentSize = header.headerSize; + + fragment.header = header; + + fullEvent.fragmentsArray[i - min_fd] = fragment; + } + } + lk.unlock(); + } + } +} + int main(int argc, char const *argv[]) { + signal(SIGTERM, term_handler); + if (argc != 2) { printf("Usage: %s portNumber \n", argv[0]); exit(EXIT_FAILURE); } int port = atoi(argv[1]); printf("Start socket port %d\n", port); + + int master_socket; + const int opt = 1; - int opt = TRUE; - int master_socket , addrlen , new_socket , client_socket[30] , - max_clients = 30 , activity, i , valread , sd; - int max_sd; - - //set of socket descriptors - fd_set readfds; - - //initialise all client_socket[] to 0 so not checked - for (i = 0; i < max_clients; i++) - { - client_socket[i] = 0; - } master_socket = makeSocket(); @@ -135,110 +313,53 @@ int main(int argc, char const *argv[]) { bindSocketPort(master_socket, port); startListening(master_socket); - while (true) { - //clear the socket set - FD_ZERO(&readfds); - - //add master socket to set - FD_SET(master_socket, &readfds); - max_sd = master_socket; + int flags = fcntl(master_socket, F_GETFL, 0); + fcntl(master_socket, F_SETFL, flags | O_NONBLOCK); - //add child sockets to set - for ( i = 0 ; i < max_clients ; i++) - { - //socket descriptor - sd = client_socket[i]; - - //if valid socket descriptor then add to read list - if(sd > 0) - FD_SET( sd , &readfds); - - //highest file descriptor number, need it for the select function - if(sd > max_sd) - max_sd = sd; - } - - //wait for an activity on one of the sockets , timeout is NULL , - //so wait indefinitely - activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL); - - if ((activity < 0) && (errno!=EINTR)) - { - printf("select error"); - } - - //If something happened on the master socket , - //then its an incoming connection - if (FD_ISSET(master_socket, &readfds)) - { - new_socket = acceptConnection(master_socket); - - //add new socket to array of sockets - for (i = 0; i < max_clients; i++) - { - //if position is empty - if( client_socket[i] == 0 ) - { - client_socket[i] = new_socket; - printf("Adding to list of sockets as %d\n" , i); - - break; - } - } - } - - //else its some IO operation on some other socket - for (i = 0; i < max_clients; i++) - { - sd = client_socket[i]; - - if (FD_ISSET( sd , &readfds)) - { - //Check if it was for closing , and also read the - //incoming message - uint32_t word; - if ((valread = recv( sd , &word, 4, 0)) == 0) - { - struct sockaddr_in address; - int addrlen; - //Somebody disconnected , get his details and print - getpeername(sd , (struct sockaddr*)&address , \ - (socklen_t*)&addrlen); - printf("Host disconnected , ip %s , port %d \n" , - inet_ntoa(address.sin_addr) , ntohs(address.sin_port)); - - printf("Disconnected fd %d", sd); - - //Close the socket and mark as 0 in list for reuse - close( sd ); - client_socket[i] = 0; - } - - //Echo back the message that came in - else - { - printf("[RICEVUTO]\t0x%x FROM %d\n", word, sd); - } - } - } + epoll_event ev, events[MAX_EVENTS]; + std::array kBytes_read_on_descr; + //The atomic here is used as a flag to tell the thread to stop + std::vector vThreads; + //create the epoll instance + int epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + printf("Failed to create epoll file descriptor\n"); + exit(EXIT_FAILURE); } - /* + ev.data.fd = master_socket; + // Reading events with edge triggered mode + ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET; - int client_fd = acceptConnection(server_fd); + // Allowing epoll to monitor the master_socket + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){ + perror("epoll_ctl"); + exit(EXIT_FAILURE); + } + + std::array vT; + std::array thread_flags; - while (true) { - uint32_t word; - ssize_t bytes = read(client_fd, &word, 4); - if (bytes != 4) { - perror("Receive failed"); - exit(EXIT_FAILURE); - } - printf("[RICEVUTO]\t0x%x\n", word); - }*/ + // Creating the data structure and initialization with max size and timeout ez win + std::array queues_array; + for (auto& queue : queues_array) { + queue.init(MAX_QUEUE_SIZE, MAX_TIMEOUT_MICROSEC); + } + + for (int t_i = 0; t_i < READER_THREADS; t_i++) { + thread_flags[t_i] = 0; + vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), std::ref(queues_array), std::cref(thread_flags.at(t_i)), t_i); + } + //ADD CONSUMER THREAD + + + if (close(epoll_fd)) { + printf("Failed to close epoll file descriptor"); + exit(EXIT_FAILURE); + } return 0; } \ No newline at end of file diff --git a/full_event_format.h b/full_event_format.h index 97d6e37..fb8583d 100644 --- a/full_event_format.h +++ b/full_event_format.h @@ -10,6 +10,7 @@ typedef struct FullEvent { uint32_t headerSize; uint32_t eventSize; uint32_t runNumber; + uint32_t eventNumber; Fragment *fragmentsArray; diff --git a/gcc-13 b/gcc-13 new file mode 120000 index 0000000..fc9273d --- /dev/null +++ b/gcc-13 @@ -0,0 +1 @@ +/home/master-roby3/newgcc/gcc/bin/gcc \ No newline at end of file diff --git a/testing/queue_testing/a.out b/testing/queue_testing/a.out new file mode 100755 index 0000000..4944b86 Binary files /dev/null and b/testing/queue_testing/a.out differ diff --git a/testing/queue_testing/range_insertion.cxx b/testing/queue_testing/range_insertion.cxx new file mode 100644 index 0000000..840e7f8 --- /dev/null +++ b/testing/queue_testing/range_insertion.cxx @@ -0,0 +1,20 @@ +#include +#include +#include +#include + + +int main() { + std::queue queue; + std::array range = {1,2,3,4}; + + queue.push_range(range); + + while (!queue.empty()) { + std::cout << queue.front() << std::endl; + queue.pop(); + } + + + return 0; +} \ No newline at end of file diff --git a/testing/queue_testing/simple_test.cxx b/testing/queue_testing/simple_test.cxx new file mode 100644 index 0000000..740a373 --- /dev/null +++ b/testing/queue_testing/simple_test.cxx @@ -0,0 +1,25 @@ +#include +#include +#include +#include + +int main() { + + std::array, 3> arr_queues; + + for (int j = 0; j < 5; j++) { + for (int i = 0; i < 3; i++) { + arr_queues[i].push(i+j+10); + } + } + + for (int j = 0; j < 5; j++) { + for (int i = 0; i < 3; i++) { + std::cout << "Queue numebr " << i << " " << arr_queues[i].front() << std::endl; + arr_queues[i].pop(); + } + } + + + return 0; +} \ No newline at end of file