SimpleEventBuilder/event_builder.cxx

466 lines
15 KiB
C++

#include <cerrno>
#include <condition_variable>
#include <cstddef>
#include <cinttypes>
#include <cstdint>
#include <cstdio> // for fprintf()
#include <functional>
#include <iostream>
#include <queue>
#include <ratio>
#include <unistd.h> // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
#include <cstring> // for strncmp
//my addition to the online guide
#include <csignal>
#include <cstdlib>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <utility>
#include <vector>
#include <chrono>
#include <fstream>
#include <thread>
#include <mutex>
#include <atomic>
#include <array>
#include <numeric>
#include "fragment_dataformat.h"
#include "full_event_format.h"
#include "TimeoutException.h"
#include "ControlledQueue.h"
#define MAX_EVENTS 1024
#define MAX_QUEUE_SIZE 1000000
#define READER_THREADS 6
// That's a buffer size of 64 kB, to maximize performance without it being too big, according to testing
#define BUFFER_SIZE_WORDS 16384
int min_fd = MAX_EVENTS + 1;
int max_fd = 0;
int exp_num = 0;
std::mutex conn_mutex;
std::condition_variable conn_cv;
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void bindSocketPort(int server_fd, int port) {
struct sockaddr_in localAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
printf("FD %d bound to port %d\n", server_fd, port);
}
void startListening(int server_fd) {
if (listen(server_fd, 20000) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
printf("FD %d listening to new connections\n", server_fd);
}
int acceptConnection(int server_fd) {
int client_fd;
struct sockaddr_in remoteAddr;
size_t addrlen = sizeof(remoteAddr);
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
} else {
int flags = fcntl(client_fd, F_GETFL);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
return client_fd;
}
void acceptConnectionEpollStyle(int server_fd, int &efd) {
struct sockaddr_in new_remoteAddr;
int addrlen = sizeof(struct sockaddr_in);
while (true) {
int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen);
if (conn_sock == -1) {
// All incoming connections have been processed
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
} else {
perror("accept");
break;
}
}
// make new connection non-blocking
int flags = fcntl(conn_sock, F_GETFL, 0);
fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK);
// monitor new connection for read events, always in edge triggered
struct epoll_event event;
event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
event.data.fd = conn_sock;
// Allow epoll to monitor the new connection
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) {
perror("epoll_ctl: conn_sock");
break;
}
printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock);
if (max_fd < conn_sock) max_fd = conn_sock;
if (min_fd > conn_sock) min_fd = conn_sock;
}
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
std::array<std::mutex, MAX_EVENTS> queues_mutexes;
void thradizable(int &epoll_fd, int &master_socket, std::array<ControlledQueue, MAX_EVENTS>& queues_array, const int &th_flag, const int thread_index) {
epoll_event events[MAX_EVENTS];
while (true) {
if (th_flag == 1) break;
// Time measurements
///auto start = std::chrono::high_resolution_clock::now();
// Returns only the sockets for which there are events
//printf("Before wait\n");
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
//printf("After wait\n");
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
// Iterate on the sockets having events
for (int i = 0; i < nfds; i++) {
//printf("Tot fds = %d reading from %d\n", nfds, i);
int fd = events[i].data.fd;
if (fd == master_socket) {
// If the activity is on the master socket, than it's a new connection request
std::unique_lock<std::mutex> conn_lk(conn_mutex);
acceptConnectionEpollStyle(master_socket, epoll_fd);
conn_lk.unlock();
conn_cv.notify_all();
} else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) {
// Than the client connection is closed, so I close it
printf("Closing %d", fd);
close(fd);
} else {
//printf("Ev trig th %d with tot b %lu\n", thread_index, part);
// Than we received data from one of the monitored sockets
uint32_t buffer[BUFFER_SIZE_WORDS];
//while (valread != EAGAIN) {
std::unique_lock<std::mutex> lk(queues_mutexes[fd]);
ssize_t valread = recv(fd, reinterpret_cast<char*>(buffer), sizeof(buffer), 0);
if (valread > 0) {
//printf("[RICEVUTO]\t FROM %d\n", fd);
int opt_incr = (valread % 4) ? 1 : 0;
for (int q_i = 0; q_i < (valread/4) + opt_incr; q_i++) {
queues_array[fd].put(buffer[q_i]);
}
/*
bytes_read += valread;
int kilos = 0;
if ((kilos = bytes_read / 1024) > 0) {
kBytes_read += kilos;
bytes_read -= (kilos * 1024);
//printf("reade bites %lu", bytes_read);
}*/
}
lk.unlock();
//}
}
}
///auto end = std::chrono::high_resolution_clock::now();
///double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
///
/*time_taken *= 1e-6;
total_time_taken += time_taken;
if (total_time_taken > 3e4) {
times.push_back(total_time_taken);
tot_received_data.push_back(kBytes_read);
break;
}*/
///
}
}
std::vector<double> times;
std::vector<double> timed_means;
double tot_time_taken = 0;
uint32_t tot_size_full_event = 0;
uint32_t num_events_stored = 0;
uint32_t fragments_lost = 0;
int fragments_per_event = max_fd - min_fd + 1;
void builder_thread (std::array<ControlledQueue, MAX_EVENTS> &queues_array, uint32_t &runNumber) {
uint32_t counter = 0;
while (1) {
auto start = std::chrono::high_resolution_clock::now();
if ( (max_fd < min_fd) || (exp_num <= 0) || ((max_fd - min_fd + 1) != exp_num)) {
std::unique_lock<std::mutex> conn_lk(conn_mutex);
while ( (max_fd < min_fd) || (exp_num <= 0) || ((max_fd - min_fd + 1) != exp_num)) {
conn_cv.wait(conn_lk);
}
conn_lk.unlock();
}
//printf("sono fuori bro");
FullEvent fullEvent;
fullEvent.headerSize = 5;
fullEvent.runNumber = runNumber;
fullEvent.eventNumber = counter;
fullEvent.fragmentsArray = new Fragment[max_fd - min_fd + 1];
uint32_t fullEventPayloadSize = 0;
double sumsize = 0;
for (int i = min_fd; i <= max_fd; i++){
//std::unique_lock<std::mutex> lk(queues_mutexes[i]);
int inCurrentEventNumber = 0;
while (inCurrentEventNumber != 1) {
try {
uint32_t starter = queues_array[i].get();
if (starter == FRAGMENT_HEADER_MARKER) {
uint32_t headerSize = queues_array[i].get();
uint32_t fragmentSize = queues_array[i].get();
uint32_t* buffer = new uint32_t[fragmentSize];
buffer[0] = starter;
buffer[1] = headerSize;
buffer[2] = fragmentSize;
for (int j = 3; j < fragmentSize; j++) {
buffer[j] = queues_array[i].get();
}
if (getDetectorEventNumber(buffer) < fullEvent.eventNumber) {
continue;
} else if (getDetectorEventNumber(buffer) == fullEvent.eventNumber) {
Fragment& fragment = fullEvent.fragmentsArray[i - min_fd];
decode_fragment(buffer, fragment);
inCurrentEventNumber = 1;
fullEventPayloadSize += fragment.header.fragmentSize;
} else {
printf("È successo un cazzo di casino");
}
delete [] buffer;
}
} catch (const TimeoutException& ex) {
//printf("Ho catchato\n");
inCurrentEventNumber = 1;
Fragment& fragment = fullEvent.fragmentsArray[i - min_fd];
Header& header = fragment.header;
header.sourceIdentifier = i - min_fd;
header.runNumber = runNumber;
header.detectorEventNumber = counter;
header.numberOfStatusElements = 1;
uint32_t firstStatusElement = 0x0;
firstStatusElement |= TIMEOUT_ERROR;
header.statusElementsArray = new uint32_t[header.numberOfStatusElements];
header.statusElementsArray[0] = firstStatusElement;
header.headerSize = 7 + header.numberOfStatusElements;
header.fragmentSize = header.headerSize;
//fragment.header = header;
//fullEvent.fragmentsArray[i - min_fd] = fragment;
fullEventPayloadSize += fragment.header.fragmentSize;
fragments_lost++;
}
}
sumsize += queues_array[i].size();
//lk.unlock();
}
double mean = sumsize / queues_array.size();
//timed_means.push_back(mean);
fullEvent.eventSize = fullEvent.headerSize + fullEventPayloadSize;
//printFullEvent(fullEvent);
num_events_stored = counter;
counter++;
auto end = std::chrono::high_resolution_clock::now();
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
time_taken *= 1e-9;
tot_time_taken += time_taken;
//times.push_back(tot_time_taken);
tot_size_full_event += fullEvent.eventSize;
//printf("timee %f", tot_time_taken);
if (tot_time_taken >= 30) break;
}
}
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 5) {
printf("Usage: %s portNumber runNumber numClients timeout_ms\n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
uint32_t runNumber = atoi(argv[2]);
exp_num = atoi(argv[3]);
int timeout_ms = atoi(argv[4]);
printf("You chose parameters: pnum %d runNum %d numClients %d timeout %d", port, runNumber, exp_num, timeout_ms);
printf("Start socket port %d\n", port);
int master_socket;
const int opt = 1;
master_socket = makeSocket();
//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
if( setsockopt(master_socket, IPPROTO_TCP, TCP_NODELAY, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
bindSocketPort(master_socket, port);
startListening(master_socket);
int flags = fcntl(master_socket, F_GETFL, 0);
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
epoll_event ev, events[MAX_EVENTS];
std::array<uint64_t, MAX_EVENTS> kBytes_read_on_descr;
//The atomic here is used as a flag to tell the thread to stop
std::vector<std::thread> vThreads;
//create the epoll instance
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
printf("Failed to create epoll file descriptor\n");
exit(EXIT_FAILURE);
}
ev.data.fd = master_socket;
// Reading events with edge triggered mode
ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
// Allowing epoll to monitor the master_socket
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
std::array<std::thread, READER_THREADS> vT;
std::array<int, READER_THREADS> thread_flags;
// Creating the data structure and initialization with max size and timeout ez win
std::array<ControlledQueue, MAX_EVENTS> queues_array;
for (auto& queue : queues_array) {
queue.init(MAX_QUEUE_SIZE, timeout_ms * 1000);
}
for (int t_i = 0; t_i < READER_THREADS; t_i++) {
thread_flags[t_i] = 0;
vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), std::ref(queues_array), std::cref(thread_flags.at(t_i)), t_i);
//vT[t_i].detach();
}
std::thread bdth(builder_thread, std::ref(queues_array), std::ref(runNumber));
//ADD CONSUMER THREAD
bdth.join();
//for (double mean : timed_means) {
// printf("%.0f ", mean);
//}
//sleep(30);
printf("fragments lost %d\n", fragments_lost);
printf("tot ev %d\n", num_events_stored);
std::ofstream outfile;
//Test on queue size over time
//outfile.open("queue_occup_1kmean_20ms_timeout50ms_500clients.csv");
//outfile << "times;means;\n";
//auto iter_times = times.begin();
//auto iter_sizes = timed_means.begin();
//for ( ; iter_times != times.end() && iter_sizes != timed_means.end(); (++iter_times, ++iter_sizes)) {
// outfile << *iter_times << ";" << *iter_sizes << ";\n";
//}
outfile.open("mean_4_std_0_meantime_0ms_stddevtime_0ms_NODELAY.csv", std::ios_base::app);
outfile << timeout_ms << ";" << tot_time_taken << ";" << tot_size_full_event * 4 << ";" << fragments_lost/static_cast<double>(fragments_per_event * num_events_stored) << ";\n";
if (close(epoll_fd)) {
printf("Failed to close epoll file descriptor");
exit(EXIT_FAILURE);
}
return 0;
}