diff --git a/backup/ControlledQueue.cxx b/backup/ControlledQueue.cxx new file mode 100644 index 0000000..3340333 --- /dev/null +++ b/backup/ControlledQueue.cxx @@ -0,0 +1,44 @@ +#include "ControlledQueue.h" +#include +#include + +ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;} + +void ControlledQueue::put(Fragment fragment) { + std::unique_lock lk(m_mtx); + + /* + This while synthax is explicitly used to avoid unwanted spurious awakenings. + I'm keeping only the wait with not timeout to transmit backpressure to the upper reading thread, which blocks. + Since that thread has locked the mutex corresponding to the fd, basically the whole fd is blocked. + That way I can tranmit backpressure with TCP to the sending client. + */ + while (! (m_queue.size() < m_maxSize) ) { + m_cv.wait(lk); + } + + m_queue.push(fragment); + + lk.unlock(); + m_cv.notify_all(); +} + +/* + Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element. + That way it can be catched from the main program that can set the error code accordingly. +*/ +Fragment ControlledQueue::get() { + std::unique_lock lk(m_mtx); + + if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) { + throw "Get Timeout"; + } + + Fragment fragment = m_queue.front(); + m_queue.pop(); + + lk.unlock(); + m_cv.notify_all(); + + return fragment; +} \ No newline at end of file diff --git a/backup/ControlledQueue.h b/backup/ControlledQueue.h new file mode 100644 index 0000000..a3d1bc9 --- /dev/null +++ b/backup/ControlledQueue.h @@ -0,0 +1,28 @@ +#pragma once + +#include + +#include +#include +#include + +#include "fragment_dataformat.h" + + + +class ControlledQueue { + public: + ControlledQueue(uint32_t maxSize, int timeoutMicroseconds); + + void put(Fragment fragment); + Fragment get(); + bool empty() { return m_queue.empty(); } + + private: + std::mutex m_mtx; + std::condition_variable m_cv; + uint32_t m_maxSize; + std::queue m_queue; + int m_timeoutMicroseconds; + +}; \ No newline at end of file diff --git a/backup/Progetto.code-workspace b/backup/Progetto.code-workspace new file mode 100644 index 0000000..bab1b7f --- /dev/null +++ b/backup/Progetto.code-workspace @@ -0,0 +1,8 @@ +{ + "folders": [ + { + "path": ".." + } + ], + "settings": {} +} \ No newline at end of file diff --git a/backup/README.md b/backup/README.md new file mode 100644 index 0000000..4663ac1 --- /dev/null +++ b/backup/README.md @@ -0,0 +1,2 @@ +# Event Builder Project +This repo contains the code for a simple Event Builder inspired by ATLAS event format. It's a project for the exam "Tecniche Digitali di Acquisizione Dati" @UNIPV. diff --git a/backup/a.out b/backup/a.out new file mode 100755 index 0000000..37154d7 Binary files /dev/null and b/backup/a.out differ diff --git a/backup/core b/backup/core new file mode 100644 index 0000000..c5a5dc1 Binary files /dev/null and b/backup/core differ diff --git a/backup/evBuild.out b/backup/evBuild.out new file mode 100755 index 0000000..679aaa5 Binary files /dev/null and b/backup/evBuild.out differ diff --git a/backup/event_builder.cxx b/backup/event_builder.cxx new file mode 100644 index 0000000..ea35e72 --- /dev/null +++ b/backup/event_builder.cxx @@ -0,0 +1,244 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + + +int makeSocket() { + int sockfd; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + return sockfd; +} + +void bindSocketPort(int server_fd, int port) { + struct sockaddr_in localAddr; + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = INADDR_ANY; + localAddr.sin_port = htons(port); + + if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { + perror("bind failed"); + exit(EXIT_FAILURE); + } + printf("FD %d bound to port %d\n", server_fd, port); +} + +void startListening(int server_fd) { + if (listen(server_fd, 3) < 0) { + perror("listen"); + exit(EXIT_FAILURE); + } + printf("FD %d listening to new connections\n", server_fd); +} + +int acceptConnection(int server_fd) { + int client_fd; + struct sockaddr_in remoteAddr; + + size_t addrlen = sizeof(remoteAddr); + if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) { + perror("accept"); + exit(EXIT_FAILURE); + } + printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd); + return client_fd; +} + +void term_handler(int signal) { + printf("Terminated, received SIGNAL %d", signal); + exit(EXIT_SUCCESS); +} + +/* +int main(int argc, char const *argv[]) { + signal(SIGTERM, term_handler); + + if (argc != 2) { + printf("Usage: %s portNumber \n", argv[0]); + exit(EXIT_FAILURE); + } + int port = atoi(argv[1]); + printf("Start socket port %d\n", port); + + int server_fd = makeSocket(); + bindSocketPort(server_fd, port); + startListening(server_fd); + int client_fd = acceptConnection(server_fd); + + while (true) { + uint32_t word; + ssize_t bytes = read(client_fd, &word, 4); + if (bytes != 4) { + perror("Receive failed"); + exit(EXIT_FAILURE); + } + printf("[RICEVUTO]\t0x%x\n", word); + } + + + return 0; +}*/ + +#define TRUE 1 +#define FALSE 0 + +int main(int argc, char const *argv[]) { + signal(SIGTERM, term_handler); + + if (argc != 2) { + printf("Usage: %s portNumber \n", argv[0]); + exit(EXIT_FAILURE); + } + int port = atoi(argv[1]); + printf("Start socket port %d\n", port); + + int opt = TRUE; + int master_socket , addrlen , new_socket , client_socket[30] , + max_clients = 30 , activity, i , valread , sd; + int max_sd; + + //set of socket descriptors + fd_set readfds; + + //initialise all client_socket[] to 0 so not checked + for (i = 0; i < max_clients; i++) + { + client_socket[i] = 0; + } + + master_socket = makeSocket(); + + //set master socket to allow multiple connections , + //this is just a good habit, it will work without this + if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, + sizeof(opt)) < 0 ) + { + perror("setsockopt"); + exit(EXIT_FAILURE); + } + + bindSocketPort(master_socket, port); + startListening(master_socket); + + while (true) { + //clear the socket set + FD_ZERO(&readfds); + + //add master socket to set + FD_SET(master_socket, &readfds); + max_sd = master_socket; + + //add child sockets to set + for ( i = 0 ; i < max_clients ; i++) + { + //socket descriptor + sd = client_socket[i]; + + //if valid socket descriptor then add to read list + if(sd > 0) + FD_SET( sd , &readfds); + + //highest file descriptor number, need it for the select function + if(sd > max_sd) + max_sd = sd; + } + + //wait for an activity on one of the sockets , timeout is NULL , + //so wait indefinitely + activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL); + + if ((activity < 0) && (errno!=EINTR)) + { + printf("select error"); + } + + //If something happened on the master socket , + //then its an incoming connection + if (FD_ISSET(master_socket, &readfds)) + { + new_socket = acceptConnection(master_socket); + + //add new socket to array of sockets + for (i = 0; i < max_clients; i++) + { + //if position is empty + if( client_socket[i] == 0 ) + { + client_socket[i] = new_socket; + printf("Adding to list of sockets as %d\n" , i); + + break; + } + } + } + + //else its some IO operation on some other socket + for (i = 0; i < max_clients; i++) + { + sd = client_socket[i]; + + if (FD_ISSET( sd , &readfds)) + { + //Check if it was for closing , and also read the + //incoming message + uint32_t word; + if ((valread = recv( sd , &word, 4, 0)) == 0) + { + struct sockaddr_in address; + int addrlen; + //Somebody disconnected , get his details and print + getpeername(sd , (struct sockaddr*)&address , \ + (socklen_t*)&addrlen); + printf("Host disconnected , ip %s , port %d \n" , + inet_ntoa(address.sin_addr) , ntohs(address.sin_port)); + + printf("Disconnected fd %d", sd); + + //Close the socket and mark as 0 in list for reuse + close( sd ); + client_socket[i] = 0; + } + + //Echo back the message that came in + else + { + printf("[RICEVUTO]\t0x%x FROM %d\n", word, sd); + } + } + } + + + } + + /* + + int client_fd = acceptConnection(server_fd); + + while (true) { + uint32_t word; + ssize_t bytes = read(client_fd, &word, 4); + if (bytes != 4) { + perror("Receive failed"); + exit(EXIT_FAILURE); + } + printf("[RICEVUTO]\t0x%x\n", word); + }*/ + + + return 0; +} \ No newline at end of file diff --git a/backup/fragment_dataformat.h b/backup/fragment_dataformat.h new file mode 100644 index 0000000..ece7d01 --- /dev/null +++ b/backup/fragment_dataformat.h @@ -0,0 +1,85 @@ +#pragma once + +#include +#include +#include + +#define FRAGMENT_HEADER_MARKER 0xee1234ee + +typedef struct Header { + uint32_t startOfHeaderMarker = FRAGMENT_HEADER_MARKER; + uint32_t headerSize; + uint32_t fragmentSize; + uint32_t sourceIdentifier; + uint32_t runNumber; + uint32_t detectorEventNumber; + uint32_t numberOfStatusElements; + uint32_t *statusElementsArray; + /*friend std::ostream& operator <<(std::ostream& os, Header const& header) + { + return os << std::setw(8) << std::setfill('0') << header.startOfHeaderMarker << '\n' + << std::setw(8) << std::setfill('0') << header.headerSize << '\n' + << std::setw(8) << std::setfill('0') << header.fragmentSize << '\n' + << std::setw(8) << std::setfill('0') << header.runNumber << '\n' + << std::setw(8) << std::setfill('0') << header.detectorEventNumber << '\n' + << std::setw(8) << std::setfill('0') << header.numberOfStatusElements << '\n'; + }*/ +} Header; + +typedef struct Fragment { + Header header; + uint32_t *payloadElements; +} Fragment; + +enum ERROR_CODES { + INCORRECT_ERROR = (1 << 0), + CORRUPTED_ERROR = (1 << 1), + MISSING_DATA_ERROR = (1 << 2), + TIMEOUT_ERROR = (1 << 3) +}; + +void encode_header(uint32_t *buffer, const Header &header) { + buffer[0] = header.startOfHeaderMarker; + buffer[1] = header.headerSize; + buffer[2] = header.fragmentSize; + buffer[3] = header.sourceIdentifier; + buffer[4] = header.runNumber; + buffer[5] = header.detectorEventNumber; + buffer[6] = header.numberOfStatusElements; + + std::memcpy(&buffer[7], header.statusElementsArray, header.numberOfStatusElements * sizeof(uint32_t)); +} + + +void encode_fragment(uint32_t *buffer, const Fragment &fragment){ + encode_header(buffer, fragment.header); + std::memcpy(&buffer[fragment.header.headerSize], fragment.payloadElements, (fragment.header.fragmentSize - fragment.header.headerSize) * sizeof(uint32_t)); +} + + +Fragment decode_fragment(uint32_t *buffer) { + Fragment fragment; + fragment.header.startOfHeaderMarker = buffer[0]; + fragment.header.headerSize = buffer[1]; + fragment.header.fragmentSize = buffer[2]; + fragment.header.sourceIdentifier = buffer[3]; + fragment.header.runNumber = buffer[4]; + fragment.header.detectorEventNumber = buffer[5]; + fragment.header.numberOfStatusElements = buffer[6]; + + uint32_t nStatusElements = fragment.header.numberOfStatusElements; + fragment.header.statusElementsArray = new uint32_t[nStatusElements]; + for (int i = 1; i <= nStatusElements; i++) { + fragment.header.statusElementsArray[6+i] = buffer[6+i]; + } + + uint32_t payload_size = fragment.header.fragmentSize - fragment.header.headerSize; + fragment.payloadElements = new uint32_t[payload_size]; + for (int i = 1; i <= payload_size; i++) { + fragment.payloadElements[6+nStatusElements+i] = buffer[6+nStatusElements+i]; + } + + return fragment; +} + + diff --git a/backup/full_event_format.h b/backup/full_event_format.h new file mode 100644 index 0000000..97d6e37 --- /dev/null +++ b/backup/full_event_format.h @@ -0,0 +1,16 @@ +#pragma once + +#include "fragment_dataformat.h" +#include + +#define FULL_EVENT_HEADER_MARKER 0xaa1234aa + +typedef struct FullEvent { + uint32_t startOfHeaderMarker = FULL_EVENT_HEADER_MARKER; + uint32_t headerSize; + uint32_t eventSize; + uint32_t runNumber; + + Fragment *fragmentsArray; + +}FullEvent; \ No newline at end of file diff --git a/backup/prov.out b/backup/prov.out new file mode 100755 index 0000000..466593a Binary files /dev/null and b/backup/prov.out differ diff --git a/backup/prova.cpp b/backup/prova.cpp new file mode 100644 index 0000000..8d79f0f --- /dev/null +++ b/backup/prova.cpp @@ -0,0 +1,5 @@ +#include + +int main() { + return 0; +} \ No newline at end of file diff --git a/backup/provider.cxx b/backup/provider.cxx new file mode 100644 index 0000000..f5086d9 --- /dev/null +++ b/backup/provider.cxx @@ -0,0 +1,161 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +#include "fragment_dataformat.h" + +#define MIN_FRAGMENTS 0 +#define MAX_FRAGMENTS 100 + + +/* +int rndError(float p){ + float rndFloat = static_cast(rand()) / static_cast(RAND_MAX); + return rndFloat < p; +}*/ + + +int makeSocket() { + int sockfd; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + return sockfd; +} + +void connectTo(int sock, const char* host, int port) { + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr(host); + serv_addr.sin_port = htons(port); + + if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + perror("Connection failed"); + exit(EXIT_FAILURE); + } + + printf("Connected to %s: %d\n", host, port); +} + + +void term_handler(int signal) { + printf("Terminated, received SIGNAL %d", signal); + exit(EXIT_SUCCESS); +} + + +int main(int argc, char* argv[]) { + signal(SIGTERM, term_handler); + signal(SIGINT, term_handler); + + //will use it later to obtain random number from hardware + std::random_device rd; + std::mt19937 generator(42); + std::discrete_distribution discrete_distr({0.9, 0.1}); + std::normal_distribution normal_distr(25., 10.); + std::uniform_int_distribution unif_int_distib; + std::uniform_real_distribution unif_float_distrib(0.5, 2); + + //commenting all old_style generation + //srand (static_cast (42)); + + bool out_condition = true; + uint32_t event_number = 0; + + uint32_t run_number = 0; + uint32_t source_id = 0; + + if (argc != 5) { + printf("Usage %s host portNumber sourceId runNumber\n", argv[0]); + exit(EXIT_FAILURE); + } + + const char* host = argv[1]; + int port = atoi(argv[2]); + source_id = atoi(argv[3]); + run_number = atoi(argv[4]); + + int socket = makeSocket(); + connectTo(socket, host, port); + + while (true) { + Header header; + header.sourceIdentifier = source_id; + header.runNumber = run_number; + header.detectorEventNumber = event_number; + event_number++; + header.numberOfStatusElements = 0; + //if swap to old style just change with function above and prob 0.1 for 1 + uint8_t incorrectError = discrete_distr(generator); + uint8_t corruptedError = discrete_distr(generator); + uint8_t missingDataError = discrete_distr(generator); + uint8_t timeoutError = discrete_distr(generator); + + uint32_t firstStatusElement = 0x0; + + if (incorrectError) firstStatusElement |= INCORRECT_ERROR; + if (corruptedError) firstStatusElement |= CORRUPTED_ERROR; + if (missingDataError) firstStatusElement |= MISSING_DATA_ERROR; + if (timeoutError) firstStatusElement |= TIMEOUT_ERROR; + + if (firstStatusElement != 0x0) { + header.numberOfStatusElements = 1; + header.statusElementsArray = new uint32_t[header.numberOfStatusElements]; + header.statusElementsArray[0] = firstStatusElement; + } + + uint32_t payload_size = std::max(std::min(static_cast(std::round(normal_distr(generator))), MAX_FRAGMENTS), MIN_FRAGMENTS); + header.headerSize = 7 + header.numberOfStatusElements; + header.fragmentSize = header.headerSize + payload_size; + + Fragment fragment; + fragment.header = header; + fragment.payloadElements = new uint32_t[payload_size]; + for (uint32_t i = 0; i < payload_size; i++) { + fragment.payloadElements[i] = unif_int_distib(generator); + } + + uint32_t buffer[header.fragmentSize]; + encode_fragment(buffer, fragment); + ssize_t bytes = send(socket, reinterpret_cast(buffer), sizeof(buffer), 0); + if (bytes != header.fragmentSize * sizeof(uint32_t)) { + perror("Send failed: num bytes not matching"); + exit(EXIT_FAILURE); + } + + + //std::cout << std::hex << std::setw(8) << std::setfill('0'); + ////std::cout << fragment.header; + //for (uint8_t i = 0; i < fragment.header.numberOfStatusElements; i++) { + // std::cout << "status element " << std::setw(8) << std::setfill('0') << fragment.header.statusElementsArray[i] << std::endl; + //} + //for (uint32_t i = 0; i < payload_size; i++) { + // std::cout << "payload element " << std::setw(8) << std::setfill('0') << fragment.payloadElements[i] << std::endl; + //} + //std::cout << std::endl; + + if (header.numberOfStatusElements > 0){ + delete [] fragment.header.statusElementsArray; + } + delete [] fragment.payloadElements; + + + sleep(unif_float_distrib(generator)); + } + + return 0; +} \ No newline at end of file diff --git a/backup/spawn_clients.sh b/backup/spawn_clients.sh new file mode 100755 index 0000000..57caa28 --- /dev/null +++ b/backup/spawn_clients.sh @@ -0,0 +1,12 @@ +#!/bin/bash +echo "Usage: $0 host port runNumber numberOfProviders" + +if [ $# -eq 4 ] +then + echo "Selected host: $1:$2 runNumber: $3 numberOfProviders: $4" + for i in $(seq 1 $4); + do + echo "Spawning provider number $i" + ./prov.out $1 $2 $3 & + done +fi diff --git a/testing/is_epoll_really_thsafe/exec_order_test.sh b/testing/is_epoll_really_thsafe/exec_order_test.sh index 63aa618..52e4dba 100755 --- a/testing/is_epoll_really_thsafe/exec_order_test.sh +++ b/testing/is_epoll_really_thsafe/exec_order_test.sh @@ -11,4 +11,3 @@ do kill -15 `pidof ./prov.out` sleep 1 done -