SimpleEventBuilder/testing/connection_method/event_builder_select.cxx

301 lines
9.2 KiB
C++

#include <arpa/inet.h>
#include <csignal>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/time.h>
#include <errno.h>
#include <vector>
#include <chrono>
#include <fstream>
#include <fcntl.h>
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void bindSocketPort(int server_fd, int port) {
struct sockaddr_in localAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
printf("FD %d bound to port %d\n", server_fd, port);
}
void startListening(int server_fd) {
if (listen(server_fd, 1024) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
printf("FD %d listening to new connections\n", server_fd);
}
int acceptConnection(int server_fd) {
int client_fd;
struct sockaddr_in remoteAddr;
size_t addrlen = sizeof(remoteAddr);
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
} else {
int flags = fcntl(client_fd, F_GETFL);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
return client_fd;
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
#define TRUE 1
#define FALSE 0
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int opt = TRUE;
int master_socket , addrlen , new_socket , client_socket[1024] ,
max_clients = 1024 , activity, i , valread , sd;
int max_sd;
//set of socket descriptors
fd_set readfds;
//initialise all client_socket[] to 0 so not checked
for (i = 0; i < max_clients; i++)
{
client_socket[i] = 0;
}
master_socket = makeSocket();
//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
bindSocketPort(master_socket, port);
startListening(master_socket);
int flags = fcntl(master_socket, F_GETFL, 0);
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
std::vector<int> sizes;
std::vector<uint64_t> tot_received_data;
std::vector<double> times;
int increment = 499;
for (int kikko = 1; kikko < 1e6 + 1;) {
switch (kikko) {
case 500:
increment = 500;
break;
case (int) 1e3:
increment = 1e3;
break;
case (int) 5e3:
increment = 5e3;
break;
case (int) 1e4:
increment = 1e4;
break;
case (int) 5e4:
increment = 5e4;
break;
case (int) 1e5:
increment = 1e5;
break;
case (int) 5e5:
increment = 5e5;
break;
}
printf("Next increment %d with current i: %d\n", increment, kikko);
uint64_t bytes_read = 0;
uint64_t kBytes_read = 0;
double total_time_taken = 0;
while (true) {
auto start = std::chrono::high_resolution_clock::now();
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(master_socket, &readfds);
max_sd = master_socket;
//add child sockets to set
for ( i = 0 ; i < max_clients ; i++)
{
//socket descriptor
sd = client_socket[i];
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
}
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
//then its an incoming connection
if (FD_ISSET(master_socket, &readfds))
{
new_socket = acceptConnection(master_socket);
//add new socket to array of sockets
for (i = 0; i < max_clients; i++)
{
//if position is empty
if( client_socket[i] == 0 )
{
client_socket[i] = new_socket;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
//else its some IO operation on some other socket
for (i = 0; i < max_clients; i++)
{
sd = client_socket[i];
if (FD_ISSET( sd , &readfds))
{
//Check if it was for closing , and also read the
//incoming message
char buffer[kikko];
/*if ((valread = recv( sd , &buffer, kikko, 0)) >= 0)
{
printf("[RICEVUTO]\t FROM %d valread: %d\n", sd, valread);
bytes_read += valread;
int kilos = 0;
if ((kilos = bytes_read / 1024) > 0) {
kBytes_read += kilos;
bytes_read -= (kilos * 1024);
//printf("reade bites %lu", bytes_read);
}
}*/
if ((valread = recv( sd , &buffer, kikko, 0)) == 0)
{
struct sockaddr_in address;
int addrlen;
//Somebody disconnected , get his details and print
getpeername(sd , (struct sockaddr*)&address , \
(socklen_t*)&addrlen);
printf("Host disconnected , ip %s , port %d \n" ,
inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
printf("Disconnected fd %d", sd);
//Close the socket and mark as 0 in list for reuse
close( sd );
client_socket[i] = 0;
}
//Echo back the message that came in
else
{
if (valread > 0) {
//printf("[RICEVUTO]\t FROM %d\n", sd);
bytes_read += valread;
int kilos = 0;
if ((kilos = bytes_read / 1024) > 0) {
kBytes_read += kilos;
bytes_read -= (kilos * 1024);
//printf("reade bites %lu", bytes_read);
}
}
}
}
}
auto end = std::chrono::high_resolution_clock::now();
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
time_taken *= 1e-6;
total_time_taken += time_taken;
if (total_time_taken > 3e4) {
times.push_back(total_time_taken);
sizes.push_back(kikko);
tot_received_data.push_back(kBytes_read);
break;
}
//sleep(3);
}
kikko += increment;
}
std::ofstream fout;
fout.open("select_data_stats_1000desc_NOBLOCK_TIMEOUT_HARD.csv");
//the time is in milliseconds and the data in kbytes
fout << "buffer_size;time;total_received_data;\n";
auto iter_sizes = sizes.begin();
auto iter_times = times.begin();
auto iter_data = tot_received_data.begin();
for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) {
fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n";
}
fout.close();
return 0;
}