diff --git a/ControlledQueue.cxx b/ControlledQueue.cxx index e0a30d4..89236d8 100644 --- a/ControlledQueue.cxx +++ b/ControlledQueue.cxx @@ -1,5 +1,6 @@ #include #include +#include #include #include "ControlledQueue.h" @@ -37,9 +38,13 @@ void ControlledQueue::put(uint32_t word) { uint32_t ControlledQueue::get() { std::unique_lock lk(m_mtx); - if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), [&]() { return !(m_queue.empty()); } ) ) { - throw TimeoutException(); + while ( m_queue.empty() ) { + if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), [&]() { return !(m_queue.empty()); } ) ) { + throw TimeoutException(); + } } + + uint32_t word = m_queue.front(); m_queue.pop(); diff --git a/a.out b/a.out index 37154d7..dfa8d34 100755 Binary files a/a.out and b/a.out differ diff --git a/core b/core index c5a5dc1..c19bbdc 100644 Binary files a/core and b/core differ diff --git a/evBuild.out b/evBuild.out index aca15c5..39dcb48 100755 Binary files a/evBuild.out and b/evBuild.out differ diff --git a/event_builder.cxx b/event_builder.cxx index 73cceb6..37ed714 100644 --- a/event_builder.cxx +++ b/event_builder.cxx @@ -36,7 +36,7 @@ #include "ControlledQueue.h" #define MAX_EVENTS 1024 -#define MAX_QUEUE_SIZE 1000 +#define MAX_QUEUE_SIZE 1000000 #define MAX_TIMEOUT_MICROSEC 50000000 #define READER_THREADS 6 @@ -176,10 +176,9 @@ void thradizable(int &epoll_fd, int &master_socket, std::array lk(queues_mutexes[fd]); - valread = recv(fd, reinterpret_cast(buffer), sizeof(buffer), 0); + ssize_t valread = recv(fd, reinterpret_cast(buffer), sizeof(buffer), 0); if (valread > 0) { //printf("[RICEVUTO]\t FROM %d\n", fd); int opt_incr = (valread % 4) ? 1 : 0; @@ -221,11 +220,16 @@ void thradizable(int &epoll_fd, int &master_socket, std::array &queues_array, uint32_t &runNumber) { + uint32_t counter = 0; while (1) { - std::unique_lock conn_lk(conn_mutex); - while ( (max_fd < min_fd) || (exp_num <= 0) || ((max_fd - min_fd + 1) != exp_num)) { - conn_cv.wait(conn_lk); + if ( (max_fd < min_fd) || (exp_num <= 0) || ((max_fd - min_fd + 1) != exp_num)) { + std::unique_lock conn_lk(conn_mutex); + while ( (max_fd < min_fd) || (exp_num <= 0) || ((max_fd - min_fd + 1) != exp_num)) { + conn_cv.wait(conn_lk); + } + + conn_lk.unlock(); } FullEvent fullEvent; @@ -235,13 +239,16 @@ void builder_thread (std::array &queues_array, uint fullEvent.fragmentsArray = new Fragment[max_fd - min_fd + 1]; + uint32_t fullEventPayloadSize = 0; + for (int i = min_fd; i <= max_fd; i++){ - std::unique_lock lk(queues_mutexes[i]); + //std::unique_lock lk(queues_mutexes[i]); int inCurrentEventNumber = 0; while (inCurrentEventNumber != 1) { try { uint32_t starter = queues_array[i].get(); + if (starter == FRAGMENT_HEADER_MARKER) { uint32_t headerSize = queues_array[i].get(); uint32_t fragmentSize = queues_array[i].get(); @@ -250,26 +257,31 @@ void builder_thread (std::array &queues_array, uint buffer[1] = headerSize; buffer[2] = fragmentSize; + for (int j = 3; j < fragmentSize; j++) { buffer[j] = queues_array[i].get(); } - Fragment fragment = decode_fragment(buffer); - - if (fragment.header.detectorEventNumber < fullEvent.eventNumber) { + if (getDetectorEventNumber(buffer) < fullEvent.eventNumber) { continue; - } else if (fragment.header.detectorEventNumber == fullEvent.eventNumber) { + } else if (getDetectorEventNumber(buffer) == fullEvent.eventNumber) { + + Fragment& fragment = fullEvent.fragmentsArray[i - min_fd]; + decode_fragment(buffer, fragment); inCurrentEventNumber = 1; - fullEvent.fragmentsArray[i - min_fd] = fragment; + + fullEventPayloadSize += fragment.header.fragmentSize; } else { printf("È successo un cazzo di casino"); } + delete [] buffer; } } catch (const TimeoutException& ex) { + printf("Ho catchato\n"); inCurrentEventNumber = 1; - Fragment fragment; - Header header; + Fragment& fragment = fullEvent.fragmentsArray[i - min_fd]; + Header& header = fragment.header; header.sourceIdentifier = i - min_fd; header.runNumber = runNumber; header.detectorEventNumber = counter; @@ -284,17 +296,20 @@ void builder_thread (std::array &queues_array, uint header.headerSize = 7 + header.numberOfStatusElements; header.fragmentSize = header.headerSize; - fragment.header = header; + //fragment.header = header; - fullEvent.fragmentsArray[i - min_fd] = fragment; + //fullEvent.fragmentsArray[i - min_fd] = fragment; + fullEventPayloadSize += fragment.header.fragmentSize; } } - printFullEvent(fullEvent); - lk.unlock(); + //lk.unlock(); } - conn_lk.unlock(); + fullEvent.eventSize = fullEvent.headerSize + fullEventPayloadSize; + printFullEvent(fullEvent); + counter++; + } } @@ -369,6 +384,7 @@ int main(int argc, char const *argv[]) { for (int t_i = 0; t_i < READER_THREADS; t_i++) { thread_flags[t_i] = 0; vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), std::ref(queues_array), std::cref(thread_flags.at(t_i)), t_i); + //vT[t_i].detach(); } std::thread bdth(builder_thread, std::ref(queues_array), std::ref(runNumber)); @@ -376,6 +392,7 @@ int main(int argc, char const *argv[]) { //ADD CONSUMER THREAD bdth.join(); + //sleep(30); if (close(epoll_fd)) { printf("Failed to close epoll file descriptor"); diff --git a/fragment_dataformat.h b/fragment_dataformat.h index f4664ed..a4e9b93 100644 --- a/fragment_dataformat.h +++ b/fragment_dataformat.h @@ -25,11 +25,27 @@ typedef struct Header { << std::setw(8) << std::setfill('0') << header.detectorEventNumber << '\n' << std::setw(8) << std::setfill('0') << header.numberOfStatusElements << '\n'; }*/ + + public: + Header() : statusElementsArray(NULL) {} + ~Header() { + if (statusElementsArray != NULL) { + delete [] statusElementsArray; + } + } } Header; typedef struct Fragment { Header header; uint32_t *payloadElements; + + public: + Fragment() : header(), payloadElements(NULL) {} + ~Fragment() { + if (payloadElements != NULL) { + delete [] payloadElements; + } + } } Fragment; enum ERROR_CODES { @@ -58,8 +74,11 @@ void encode_fragment(uint32_t *buffer, const Fragment &fragment){ } -Fragment decode_fragment(uint32_t *buffer) { - Fragment fragment; +uint32_t getDetectorEventNumber(uint32_t *buffer) { + return buffer[5]; +} + +void decode_fragment(uint32_t *buffer, Fragment &fragment) { fragment.header.startOfHeaderMarker = buffer[0]; fragment.header.headerSize = buffer[1]; fragment.header.fragmentSize = buffer[2]; @@ -69,37 +88,36 @@ Fragment decode_fragment(uint32_t *buffer) { fragment.header.numberOfStatusElements = buffer[6]; uint32_t nStatusElements = fragment.header.numberOfStatusElements; + printf("Status elements bitch: %d\n", nStatusElements); fragment.header.statusElementsArray = new uint32_t[nStatusElements]; for (int i = 1; i <= nStatusElements; i++) { - fragment.header.statusElementsArray[6+i] = buffer[6+i]; + fragment.header.statusElementsArray[i-1] = buffer[6+i]; } uint32_t payload_size = fragment.header.fragmentSize - fragment.header.headerSize; fragment.payloadElements = new uint32_t[payload_size]; for (int i = 1; i <= payload_size; i++) { - fragment.payloadElements[6+nStatusElements+i] = buffer[6+nStatusElements+i]; + fragment.payloadElements[i-1] = buffer[6+nStatusElements+i]; } - - return fragment; } void printFragment(Fragment &fragment) { printf("NewFrag\n"); - printf("0x%08x", fragment.header.startOfHeaderMarker); - printf("0x%08x", fragment.header.headerSize); - printf("0x%08x", fragment.header.fragmentSize); - printf("0x%08x", fragment.header.sourceIdentifier); - printf("0x%08x", fragment.header.runNumber); - printf("0x%08x", fragment.header.detectorEventNumber); - printf("0x%08x", fragment.header.numberOfStatusElements); + printf("0x%08x\n", fragment.header.startOfHeaderMarker); + printf("0x%08x\n", fragment.header.headerSize); + printf("0x%08x\n", fragment.header.fragmentSize); + printf("0x%08x\n", fragment.header.sourceIdentifier); + printf("0x%08x\n", fragment.header.runNumber); + printf("0x%08x\n", fragment.header.detectorEventNumber); + printf("0x%08x\n", fragment.header.numberOfStatusElements); for (uint32_t i = 0; i < fragment.header.numberOfStatusElements; i++) { - printf("0x%08x", fragment.header.statusElementsArray[i]); + printf("0x%08x\n", fragment.header.statusElementsArray[i]); } uint32_t payloadSize = fragment.header.fragmentSize - fragment.header.headerSize; for (uint32_t i = 0; i < payloadSize; i++) { - printf("0x%08x", fragment.payloadElements[i]); + printf("0x%08x\n", fragment.payloadElements[i]); } diff --git a/full_event_format.h b/full_event_format.h index 4257080..101aa66 100644 --- a/full_event_format.h +++ b/full_event_format.h @@ -15,6 +15,14 @@ typedef struct FullEvent { Fragment *fragmentsArray; + public: + FullEvent() : fragmentsArray(NULL) {} + ~FullEvent() { + if (fragmentsArray != NULL) { + delete [] fragmentsArray; + } + } + }FullEvent; void printFullEvent(FullEvent &fullEvent) { diff --git a/prov.out b/prov.out index cbb2246..0ba4ddc 100755 Binary files a/prov.out and b/prov.out differ diff --git a/provider.cxx b/provider.cxx index 04cc27c..f75afac 100644 --- a/provider.cxx +++ b/provider.cxx @@ -68,7 +68,7 @@ int main(int argc, char* argv[]) { std::discrete_distribution discrete_distr({0.9, 0.1}); std::normal_distribution normal_distr(25., 10.); std::uniform_int_distribution unif_int_distib; - std::uniform_real_distribution unif_float_distrib(2, 5); + std::uniform_real_distribution unif_float_distrib(1, 2); //commenting all old_style generation //srand (static_cast (42)); @@ -93,7 +93,8 @@ int main(int argc, char* argv[]) { connectTo(socket, host, port); while (true) { - Header header; + Fragment fragment; + Header& header = fragment.header; header.sourceIdentifier = source_id; header.runNumber = run_number; header.detectorEventNumber = event_number; @@ -122,8 +123,6 @@ int main(int argc, char* argv[]) { header.headerSize = 7 + header.numberOfStatusElements; header.fragmentSize = header.headerSize + payload_size; - Fragment fragment; - fragment.header = header; fragment.payloadElements = new uint32_t[payload_size]; for (uint32_t i = 0; i < payload_size; i++) { fragment.payloadElements[i] = unif_int_distib(generator); @@ -131,7 +130,9 @@ int main(int argc, char* argv[]) { uint32_t buffer[header.fragmentSize]; encode_fragment(buffer, fragment); + //printFragment(fragment); ssize_t bytes = send(socket, reinterpret_cast(buffer), sizeof(buffer), 0); + //printf("sent bytes %zd\n", bytes); if (bytes != header.fragmentSize * sizeof(uint32_t)) { perror("Send failed: num bytes not matching"); exit(EXIT_FAILURE); @@ -148,13 +149,8 @@ int main(int argc, char* argv[]) { //} //std::cout << std::endl; - if (header.numberOfStatusElements > 0){ - delete [] fragment.header.statusElementsArray; - } - delete [] fragment.payloadElements; - - sleep(unif_float_distrib(generator)); + usleep(static_cast(unif_float_distrib(generator) * 1e6)); } return 0; diff --git a/trialsFullEvent.cxx b/trialsFullEvent.cxx new file mode 100644 index 0000000..af450ed --- /dev/null +++ b/trialsFullEvent.cxx @@ -0,0 +1,40 @@ + + +#include "fragment_dataformat.h" +#include "full_event_format.h" +#include +int main() { + FullEvent fullEvent; + fullEvent.headerSize = 5; + fullEvent.runNumber = 5; + fullEvent.eventNumber = 1; + fullEvent.fragmentsArray = new Fragment[5]; + + uint32_t fullEventPayloadSize = 0; + + + for (int i = 0; i <5; i++){ + uint32_t starter = 1; + uint32_t headerSize = 7; + uint32_t fragmentSize = 10; + uint32_t* buffer = new uint32_t[fragmentSize]; + buffer[0] = starter; + buffer[1] = headerSize; + buffer[2] = fragmentSize; + + for (int j = 3; j < fragmentSize; j++) { + buffer[j] = 0; + } + + printf("ecchecazzo\n"); + + Fragment& fragment = fullEvent.fragmentsArray[i]; + decode_fragment(buffer, fragment); + + printf("ecchecazzo\n"); + + fullEventPayloadSize += fragment.header.fragmentSize; + delete [] buffer; + printf("codiaz\n"); + } +} \ No newline at end of file diff --git a/tries.out b/tries.out new file mode 100755 index 0000000..dc528aa Binary files /dev/null and b/tries.out differ