diff --git a/evBuild.out b/evBuild.out index 509385d..aca15c5 100755 Binary files a/evBuild.out and b/evBuild.out differ diff --git a/event_builder.cxx b/event_builder.cxx index 7b109ee..73cceb6 100644 --- a/event_builder.cxx +++ b/event_builder.cxx @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -36,7 +37,7 @@ #define MAX_EVENTS 1024 #define MAX_QUEUE_SIZE 1000 -#define MAX_TIMEOUT_MICROSEC 500000 +#define MAX_TIMEOUT_MICROSEC 50000000 #define READER_THREADS 6 // That's a buffer size of 64 kB, to maximize performance without it being too big, according to testing @@ -44,6 +45,10 @@ int min_fd = MAX_EVENTS + 1; int max_fd = 0; +int exp_num = 0; + +std::mutex conn_mutex; +std::condition_variable conn_cv; int makeSocket() { int sockfd; @@ -159,8 +164,10 @@ void thradizable(int &epoll_fd, int &master_socket, std::array conn_lk(conn_mutex); acceptConnectionEpollStyle(master_socket, epoll_fd); - + conn_lk.unlock(); + conn_cv.notify_all(); } else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { // Than the client connection is closed, so I close it printf("Closing %d", fd); @@ -216,6 +223,11 @@ void thradizable(int &epoll_fd, int &master_socket, std::array &queues_array, uint32_t &runNumber) { uint32_t counter = 0; while (1) { + std::unique_lock conn_lk(conn_mutex); + while ( (max_fd < min_fd) || (exp_num <= 0) || ((max_fd - min_fd + 1) != exp_num)) { + conn_cv.wait(conn_lk); + } + FullEvent fullEvent; fullEvent.headerSize = 5; fullEvent.runNumber = runNumber; @@ -277,8 +289,12 @@ void builder_thread (std::array &queues_array, uint fullEvent.fragmentsArray[i - min_fd] = fragment; } } + + printFullEvent(fullEvent); lk.unlock(); } + + conn_lk.unlock(); } } @@ -288,11 +304,13 @@ int main(int argc, char const *argv[]) { signal(SIGTERM, term_handler); - if (argc != 2) { - printf("Usage: %s portNumber \n", argv[0]); + if (argc != 4) { + printf("Usage: %s portNumber runNumber numClients\n", argv[0]); exit(EXIT_FAILURE); } int port = atoi(argv[1]); + uint32_t runNumber = atoi(argv[2]); + exp_num = atoi(argv[3]); printf("Start socket port %d\n", port); int master_socket; @@ -339,8 +357,8 @@ int main(int argc, char const *argv[]) { exit(EXIT_FAILURE); } - std::array vT; - std::array thread_flags; + std::array vT; + std::array thread_flags; // Creating the data structure and initialization with max size and timeout ez win std::array queues_array; @@ -353,8 +371,11 @@ int main(int argc, char const *argv[]) { vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), std::ref(queues_array), std::cref(thread_flags.at(t_i)), t_i); } + std::thread bdth(builder_thread, std::ref(queues_array), std::ref(runNumber)); + //ADD CONSUMER THREAD + bdth.join(); if (close(epoll_fd)) { printf("Failed to close epoll file descriptor"); diff --git a/fragment_dataformat.h b/fragment_dataformat.h index ece7d01..f4664ed 100644 --- a/fragment_dataformat.h +++ b/fragment_dataformat.h @@ -2,6 +2,7 @@ #include #include +#include #include #define FRAGMENT_HEADER_MARKER 0xee1234ee @@ -82,4 +83,26 @@ Fragment decode_fragment(uint32_t *buffer) { return fragment; } +void printFragment(Fragment &fragment) { + printf("NewFrag\n"); + printf("0x%08x", fragment.header.startOfHeaderMarker); + printf("0x%08x", fragment.header.headerSize); + printf("0x%08x", fragment.header.fragmentSize); + printf("0x%08x", fragment.header.sourceIdentifier); + printf("0x%08x", fragment.header.runNumber); + printf("0x%08x", fragment.header.detectorEventNumber); + printf("0x%08x", fragment.header.numberOfStatusElements); + + for (uint32_t i = 0; i < fragment.header.numberOfStatusElements; i++) { + printf("0x%08x", fragment.header.statusElementsArray[i]); + } + + uint32_t payloadSize = fragment.header.fragmentSize - fragment.header.headerSize; + for (uint32_t i = 0; i < payloadSize; i++) { + printf("0x%08x", fragment.payloadElements[i]); + } + + +} + diff --git a/full_event_format.h b/full_event_format.h index fb8583d..4257080 100644 --- a/full_event_format.h +++ b/full_event_format.h @@ -2,6 +2,7 @@ #include "fragment_dataformat.h" #include +#include #define FULL_EVENT_HEADER_MARKER 0xaa1234aa @@ -14,4 +15,26 @@ typedef struct FullEvent { Fragment *fragmentsArray; -}FullEvent; \ No newline at end of file +}FullEvent; + +void printFullEvent(FullEvent &fullEvent) { + printf("0x%08x\n", fullEvent.startOfHeaderMarker); + printf("0x%08x\n", fullEvent.headerSize); + printf("0x%08x\n", fullEvent.eventSize); + printf("0x%08x\n", fullEvent.runNumber); + printf("0x%08x\n", fullEvent.eventNumber); + + printf("\n"); + + + uint32_t remainingPayloadSize = fullEvent.eventSize - fullEvent.headerSize; + uint32_t counter = 0; + while (remainingPayloadSize > 0) { + printFragment(fullEvent.fragmentsArray[counter]); + remainingPayloadSize -= fullEvent.fragmentsArray[counter].header.fragmentSize; + counter++; + printf("\n"); + } + + printf("\n\n\n"); +} \ No newline at end of file diff --git a/prov.out b/prov.out index be6238f..cbb2246 100755 Binary files a/prov.out and b/prov.out differ diff --git a/provider.cxx b/provider.cxx index bd5c602..04cc27c 100644 --- a/provider.cxx +++ b/provider.cxx @@ -68,7 +68,7 @@ int main(int argc, char* argv[]) { std::discrete_distribution discrete_distr({0.9, 0.1}); std::normal_distribution normal_distr(25., 10.); std::uniform_int_distribution unif_int_distib; - std::uniform_real_distribution unif_float_distrib(0.5, 2); + std::uniform_real_distribution unif_float_distrib(2, 5); //commenting all old_style generation //srand (static_cast (42)); diff --git a/spawn_clients.sh b/spawn_clients.sh index 57caa28..d52f3b4 100755 --- a/spawn_clients.sh +++ b/spawn_clients.sh @@ -7,6 +7,6 @@ then for i in $(seq 1 $4); do echo "Spawning provider number $i" - ./prov.out $1 $2 $3 & + ./prov.out $1 $2 $i $3 & done fi