Various changes and extended preliminary testing
parent
11e3f51a3a
commit
14795161ec
BIN
evBuild.out
BIN
evBuild.out
Binary file not shown.
|
@ -10,6 +10,11 @@
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include <sys/time.h>
|
#include <sys/time.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
#include <queue>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
|
|
||||||
int makeSocket() {
|
int makeSocket() {
|
||||||
int sockfd;
|
int sockfd;
|
||||||
|
@ -44,6 +49,7 @@ void startListening(int server_fd) {
|
||||||
int acceptConnection(int server_fd) {
|
int acceptConnection(int server_fd) {
|
||||||
int client_fd;
|
int client_fd;
|
||||||
struct sockaddr_in remoteAddr;
|
struct sockaddr_in remoteAddr;
|
||||||
|
|
||||||
size_t addrlen = sizeof(remoteAddr);
|
size_t addrlen = sizeof(remoteAddr);
|
||||||
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
|
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
|
||||||
perror("accept");
|
perror("accept");
|
||||||
|
@ -91,8 +97,6 @@ int main(int argc, char const *argv[]) {
|
||||||
#define TRUE 1
|
#define TRUE 1
|
||||||
#define FALSE 0
|
#define FALSE 0
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int main(int argc, char const *argv[]) {
|
int main(int argc, char const *argv[]) {
|
||||||
signal(SIGTERM, term_handler);
|
signal(SIGTERM, term_handler);
|
||||||
|
|
||||||
|
@ -139,9 +143,90 @@ int main(int argc, char const *argv[]) {
|
||||||
FD_SET(master_socket, &readfds);
|
FD_SET(master_socket, &readfds);
|
||||||
max_sd = master_socket;
|
max_sd = master_socket;
|
||||||
|
|
||||||
|
//add child sockets to set
|
||||||
|
for ( i = 0 ; i < max_clients ; i++)
|
||||||
|
{
|
||||||
|
//socket descriptor
|
||||||
|
sd = client_socket[i];
|
||||||
|
|
||||||
|
//if valid socket descriptor then add to read list
|
||||||
|
if(sd > 0)
|
||||||
|
FD_SET( sd , &readfds);
|
||||||
|
|
||||||
|
//highest file descriptor number, need it for the select function
|
||||||
|
if(sd > max_sd)
|
||||||
|
max_sd = sd;
|
||||||
|
}
|
||||||
|
|
||||||
|
//wait for an activity on one of the sockets , timeout is NULL ,
|
||||||
|
//so wait indefinitely
|
||||||
|
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
|
||||||
|
|
||||||
|
if ((activity < 0) && (errno!=EINTR))
|
||||||
|
{
|
||||||
|
printf("select error");
|
||||||
|
}
|
||||||
|
|
||||||
|
//If something happened on the master socket ,
|
||||||
|
//then its an incoming connection
|
||||||
|
if (FD_ISSET(master_socket, &readfds))
|
||||||
|
{
|
||||||
|
new_socket = acceptConnection(master_socket);
|
||||||
|
|
||||||
|
//add new socket to array of sockets
|
||||||
|
for (i = 0; i < max_clients; i++)
|
||||||
|
{
|
||||||
|
//if position is empty
|
||||||
|
if( client_socket[i] == 0 )
|
||||||
|
{
|
||||||
|
client_socket[i] = new_socket;
|
||||||
|
printf("Adding to list of sockets as %d\n" , i);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//else its some IO operation on some other socket
|
||||||
|
for (i = 0; i < max_clients; i++)
|
||||||
|
{
|
||||||
|
sd = client_socket[i];
|
||||||
|
|
||||||
|
if (FD_ISSET( sd , &readfds))
|
||||||
|
{
|
||||||
|
//Check if it was for closing , and also read the
|
||||||
|
//incoming message
|
||||||
|
uint32_t word;
|
||||||
|
if ((valread = recv( sd , &word, 4, 0)) == 0)
|
||||||
|
{
|
||||||
|
struct sockaddr_in address;
|
||||||
|
int addrlen;
|
||||||
|
//Somebody disconnected , get his details and print
|
||||||
|
getpeername(sd , (struct sockaddr*)&address , \
|
||||||
|
(socklen_t*)&addrlen);
|
||||||
|
printf("Host disconnected , ip %s , port %d \n" ,
|
||||||
|
inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
|
||||||
|
|
||||||
|
printf("Disconnected fd %d", sd);
|
||||||
|
|
||||||
|
//Close the socket and mark as 0 in list for reuse
|
||||||
|
close( sd );
|
||||||
|
client_socket[i] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Echo back the message that came in
|
||||||
|
else
|
||||||
|
{
|
||||||
|
printf("[RICEVUTO]\t0x%x FROM %d\n", word, sd);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
|
||||||
int client_fd = acceptConnection(server_fd);
|
int client_fd = acceptConnection(server_fd);
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
|
@ -152,7 +237,7 @@ int main(int argc, char const *argv[]) {
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
printf("[RICEVUTO]\t0x%x\n", word);
|
printf("[RICEVUTO]\t0x%x\n", word);
|
||||||
}
|
}*/
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -133,7 +133,7 @@ int main(int argc, char* argv[]) {
|
||||||
encode_fragment(buffer, fragment);
|
encode_fragment(buffer, fragment);
|
||||||
ssize_t bytes = send(socket, reinterpret_cast<char*>(buffer), sizeof(buffer), 0);
|
ssize_t bytes = send(socket, reinterpret_cast<char*>(buffer), sizeof(buffer), 0);
|
||||||
if (bytes != header.fragmentSize * sizeof(uint32_t)) {
|
if (bytes != header.fragmentSize * sizeof(uint32_t)) {
|
||||||
perror("Send failed: nub bytes not matching");
|
perror("Send failed: num bytes not matching");
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
#!/bin/bash
|
||||||
|
echo "Usage: $0 host port runNumber numberOfProviders"
|
||||||
|
|
||||||
|
if [ $# -eq 4 ]
|
||||||
|
then
|
||||||
|
echo "Selected host: $1:$2 runNumber: $3 numberOfProviders: $4"
|
||||||
|
for i in $(seq 1 $4);
|
||||||
|
do
|
||||||
|
echo "Spawning provider number $i"
|
||||||
|
./prov.out $1 $2 $3 &
|
||||||
|
done
|
||||||
|
fi
|
Binary file not shown.
|
@ -0,0 +1,105 @@
|
||||||
|
#include <algorithm>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <ctime>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <csignal>
|
||||||
|
#include <chrono>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <tuple>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <fstream>
|
||||||
|
|
||||||
|
int makeSocket() {
|
||||||
|
int sockfd;
|
||||||
|
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||||
|
perror("socket failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
return sockfd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void connectTo(int sock, const char* host, int port) {
|
||||||
|
struct sockaddr_in serv_addr;
|
||||||
|
serv_addr.sin_family = AF_INET;
|
||||||
|
serv_addr.sin_addr.s_addr = inet_addr(host);
|
||||||
|
serv_addr.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
|
||||||
|
perror("Connection failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("Connected to %s: %d\n", host, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
|
||||||
|
int socket = makeSocket();
|
||||||
|
connectTo(socket, "127.0.0.1", 7777);
|
||||||
|
|
||||||
|
//allocating 100 megabytes of memory
|
||||||
|
uint64_t* chunky_boy = new uint64_t[13107200];
|
||||||
|
size_t chunky_boy_size = 13107200 * sizeof(uint64_t);
|
||||||
|
printf("chonky size %d", static_cast<int>(chunky_boy_size));
|
||||||
|
sleep(5);
|
||||||
|
|
||||||
|
//setting memory to verify non-emptyness
|
||||||
|
memset(chunky_boy, 45678832, chunky_boy_size);
|
||||||
|
|
||||||
|
std::vector<int> sizes;
|
||||||
|
std::vector<double> times;
|
||||||
|
|
||||||
|
|
||||||
|
for ( int i = 1; i < 1e6 + 2; i += 1e3) {
|
||||||
|
|
||||||
|
printf("Selected buffer size: %d\n", i);
|
||||||
|
auto start = std::chrono::high_resolution_clock::now();
|
||||||
|
int kilos = 0;
|
||||||
|
for ( int j = 0; j < chunky_boy_size; ) {
|
||||||
|
|
||||||
|
ssize_t bytes = send(socket, reinterpret_cast<char*>(chunky_boy) + j, std::min(static_cast<int>(chunky_boy_size) - j, i), 0);
|
||||||
|
j += i;
|
||||||
|
if (j%1024 == 0) {
|
||||||
|
kilos++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
auto end = std::chrono::high_resolution_clock::now();
|
||||||
|
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
|
||||||
|
|
||||||
|
time_taken *= 1e-9;
|
||||||
|
|
||||||
|
times.push_back(time_taken);
|
||||||
|
sizes.push_back(i);
|
||||||
|
|
||||||
|
std::cout << "Time taken by program is : " << std::fixed
|
||||||
|
<< time_taken << std::setprecision(9);
|
||||||
|
std::cout << " sec" << std::endl;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ofstream fout;
|
||||||
|
fout.open("send_data_buffer");
|
||||||
|
fout << "buffer_size;time;\n";
|
||||||
|
auto iter_sizes = sizes.begin();
|
||||||
|
auto iter_times = times.begin();
|
||||||
|
|
||||||
|
for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) ; (++iter_sizes, ++iter_times) ) {
|
||||||
|
fout << *iter_sizes << ";" << *iter_times << ";\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
fout.close();
|
||||||
|
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Binary file not shown.
|
@ -0,0 +1,91 @@
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <csignal>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <sys/select.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
int makeSocket() {
|
||||||
|
int sockfd;
|
||||||
|
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||||
|
perror("socket failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
return sockfd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void bindSocketPort(int server_fd, int port) {
|
||||||
|
struct sockaddr_in localAddr;
|
||||||
|
localAddr.sin_family = AF_INET;
|
||||||
|
localAddr.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
localAddr.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
|
||||||
|
perror("bind failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("FD %d bound to port %d\n", server_fd, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
void startListening(int server_fd) {
|
||||||
|
if (listen(server_fd, 3) < 0) {
|
||||||
|
perror("listen");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("FD %d listening to new connections\n", server_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
int acceptConnection(int server_fd) {
|
||||||
|
int client_fd;
|
||||||
|
struct sockaddr_in remoteAddr;
|
||||||
|
size_t addrlen = sizeof(remoteAddr);
|
||||||
|
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
|
||||||
|
perror("accept");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
|
||||||
|
return client_fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
if (argc != 2) {
|
||||||
|
printf("Usage: %s portNumber \n", argv[0]);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
int port = atoi(argv[1]);
|
||||||
|
printf("Start socket port %d\n", port);
|
||||||
|
|
||||||
|
int server_fd = makeSocket();
|
||||||
|
bindSocketPort(server_fd, port);
|
||||||
|
startListening(server_fd);
|
||||||
|
int client_fd = acceptConnection(server_fd);
|
||||||
|
|
||||||
|
uint64_t bytes_read = 0;
|
||||||
|
uint64_t kbytes_read = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
char buffer[32 * 1024] = {0};
|
||||||
|
ssize_t bytes = read(client_fd, &buffer, sizeof(buffer));
|
||||||
|
|
||||||
|
bytes_read += static_cast<uint64_t>(bytes);
|
||||||
|
//printf("tot bytos %lu\n", bytes_read);
|
||||||
|
|
||||||
|
if (uint64_t newkb = (bytes_read / 1024) > 0) {
|
||||||
|
//printf("before: %lu", bytes_read);
|
||||||
|
bytes_read -= (1024 * newkb);
|
||||||
|
//printf("after: %lu", bytes_read);
|
||||||
|
kbytes_read += newkb;
|
||||||
|
//printf("Read %d kbytes\n", static_cast<int>(kbytes_read));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000;45817;
|
||||||
|
500;30000;16688512;
|
||||||
|
1000;30000;26801219;
|
||||||
|
2000;30004.1;40979357;
|
||||||
|
3000;30000.2;49705756;
|
||||||
|
4000;30000.1;56029482;
|
||||||
|
5000;30000.1;60047108;
|
||||||
|
10000;30000.2;99444868;
|
||||||
|
20000;30000.2;106228515;
|
||||||
|
30000;30000.6;103007812;
|
||||||
|
40000;30000.5;112140625;
|
||||||
|
50000;30000.8;104677734;
|
||||||
|
100000;30002.8;119785156;
|
||||||
|
200000;30001.2;117539062;
|
||||||
|
300000;30002.7;127939453;
|
||||||
|
400000;30010.1;121523437;
|
||||||
|
500000;30004.6;125390625;
|
||||||
|
1000000;30036.4;126757812;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000.5;26535;
|
||||||
|
500;30000.9;12931772;
|
||||||
|
1000;30000.8;20953270;
|
||||||
|
2000;30000.7;30575921;
|
||||||
|
3000;30000.2;37463270;
|
||||||
|
4000;30003.3;42943248;
|
||||||
|
5000;30000.4;47972201;
|
||||||
|
10000;30003.9;57650021;
|
||||||
|
20000;30001.7;70555676;
|
||||||
|
30000;30006.6;66315379;
|
||||||
|
40000;30000.8;64524353;
|
||||||
|
50000;30006.4;80692139;
|
||||||
|
100000;30008.4;93501600;
|
||||||
|
200000;30002.2;103220112;
|
||||||
|
300000;30010;108385843;
|
||||||
|
400000;30002.3;110109484;
|
||||||
|
500000;30041.3;106363740;
|
||||||
|
1000000;30000;103425306;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30001.7;23972;
|
||||||
|
500;30001.1;16452118;
|
||||||
|
1000;30000;28659540;
|
||||||
|
2000;30000;43843616;
|
||||||
|
3000;30000;53965134;
|
||||||
|
4000;30000;78146535;
|
||||||
|
5000;30000;81098297;
|
||||||
|
10000;30000.1;104959571;
|
||||||
|
20000;30000;103154410;
|
||||||
|
30000;30000.1;98912274;
|
||||||
|
40000;30000.2;106528334;
|
||||||
|
50000;30000;108412018;
|
||||||
|
100000;30000.1;99869795;
|
||||||
|
200000;30001.9;113863135;
|
||||||
|
300000;30006.9;113726256;
|
||||||
|
400000;30003.6;113588963;
|
||||||
|
500000;30004.1;124238792;
|
||||||
|
1000000;30005.3;127730003;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000.7;32735;
|
||||||
|
500;30000;18563821;
|
||||||
|
1000;30000;33321199;
|
||||||
|
2000;30000;57600348;
|
||||||
|
3000;30000;72226712;
|
||||||
|
4000;30000;80548950;
|
||||||
|
5000;30000;92428421;
|
||||||
|
10000;30000;125112074;
|
||||||
|
20000;30000;130615660;
|
||||||
|
30000;30000;133488130;
|
||||||
|
40000;30000;149347337;
|
||||||
|
50000;30000;141748519;
|
||||||
|
100000;30000;145676700;
|
||||||
|
200000;30000.1;167065891;
|
||||||
|
300000;30000.1;180569049;
|
||||||
|
400000;30000;186854294;
|
||||||
|
500000;30000;191359610;
|
||||||
|
1000000;30000;187503120;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000.3;38239;
|
||||||
|
500;30000.3;15008694;
|
||||||
|
1000;30000.5;24572299;
|
||||||
|
2000;30000.1;34510344;
|
||||||
|
3000;30001;41199235;
|
||||||
|
4000;30000.9;45359880;
|
||||||
|
5000;30000.5;48985625;
|
||||||
|
10000;30003.5;59779061;
|
||||||
|
20000;30003.2;66210747;
|
||||||
|
30000;30000.4;70755300;
|
||||||
|
40000;30002.9;91266308;
|
||||||
|
50000;30005.9;112726855;
|
||||||
|
100000;30002;120034453;
|
||||||
|
200000;30004.6;120310170;
|
||||||
|
300000;30011.9;110168294;
|
||||||
|
400000;30017;103690777;
|
||||||
|
500000;30023.6;106875799;
|
||||||
|
1000000;30065.7;111296658;
|
|
Binary file not shown.
Binary file not shown.
|
@ -0,0 +1,283 @@
|
||||||
|
#include <asm-generic/errno-base.h>
|
||||||
|
#include <asm-generic/errno.h>
|
||||||
|
#include <cerrno>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdio> // for fprintf()
|
||||||
|
#include <unistd.h> // for close(), read()
|
||||||
|
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
|
||||||
|
#include <cstring> // for strncmp
|
||||||
|
|
||||||
|
//my addition to the online guide
|
||||||
|
#include <csignal>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
|
||||||
|
#include <fcntl.h>
|
||||||
|
#include <vector>
|
||||||
|
#include <chrono>
|
||||||
|
#include <fstream>
|
||||||
|
|
||||||
|
#define MAX_EVENTS 20000
|
||||||
|
|
||||||
|
int makeSocket() {
|
||||||
|
int sockfd;
|
||||||
|
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||||
|
perror("socket failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
return sockfd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void bindSocketPort(int server_fd, int port) {
|
||||||
|
struct sockaddr_in localAddr;
|
||||||
|
localAddr.sin_family = AF_INET;
|
||||||
|
localAddr.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
localAddr.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
|
||||||
|
perror("bind failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("FD %d bound to port %d\n", server_fd, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
void startListening(int server_fd) {
|
||||||
|
if (listen(server_fd, 20000) < 0) {
|
||||||
|
perror("listen");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("FD %d listening to new connections\n", server_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
int acceptConnection(int server_fd) {
|
||||||
|
int client_fd;
|
||||||
|
struct sockaddr_in remoteAddr;
|
||||||
|
|
||||||
|
size_t addrlen = sizeof(remoteAddr);
|
||||||
|
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
|
||||||
|
perror("accept");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
} else {
|
||||||
|
int flags = fcntl(client_fd, F_GETFL);
|
||||||
|
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
}
|
||||||
|
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
|
||||||
|
return client_fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void acceptConnectionEpollStyle(int server_fd, int &efd) {
|
||||||
|
struct sockaddr_in new_remoteAddr;
|
||||||
|
int addrlen = sizeof(struct sockaddr_in);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen);
|
||||||
|
|
||||||
|
if (conn_sock == -1) {
|
||||||
|
// All incoming connections have been processed
|
||||||
|
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
perror("accept");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// make new connection non-blocking
|
||||||
|
int flags = fcntl(conn_sock, F_GETFL, 0);
|
||||||
|
fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
|
||||||
|
// monitor new connection for read events, always in edge triggered
|
||||||
|
struct epoll_event event;
|
||||||
|
event.events = EPOLLIN;// | EPOLLET;
|
||||||
|
event.data.fd = conn_sock;
|
||||||
|
|
||||||
|
// Allow epoll to monitor the new connection
|
||||||
|
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) {
|
||||||
|
perror("epoll_ctl: conn_sock");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void term_handler(int signal) {
|
||||||
|
printf("Terminated, received SIGNAL %d", signal);
|
||||||
|
exit(EXIT_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char const *argv[]) {
|
||||||
|
|
||||||
|
signal(SIGTERM, term_handler);
|
||||||
|
|
||||||
|
|
||||||
|
if (argc != 2) {
|
||||||
|
printf("Usage: %s portNumber \n", argv[0]);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
int port = atoi(argv[1]);
|
||||||
|
printf("Start socket port %d\n", port);
|
||||||
|
|
||||||
|
int master_socket;
|
||||||
|
const int opt = 1;
|
||||||
|
|
||||||
|
|
||||||
|
master_socket = makeSocket();
|
||||||
|
|
||||||
|
//set master socket to allow multiple connections ,
|
||||||
|
//this is just a good habit, it will work without this
|
||||||
|
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
|
||||||
|
sizeof(opt)) < 0 )
|
||||||
|
{
|
||||||
|
perror("setsockopt");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
bindSocketPort(master_socket, port);
|
||||||
|
startListening(master_socket);
|
||||||
|
|
||||||
|
int flags = fcntl(master_socket, F_GETFL, 0);
|
||||||
|
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
|
||||||
|
struct epoll_event ev, events[MAX_EVENTS];
|
||||||
|
|
||||||
|
//create the epoll instance
|
||||||
|
int epoll_fd = epoll_create1(0);
|
||||||
|
if (epoll_fd == -1) {
|
||||||
|
printf("Failed to create epoll file descriptor\n");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
ev.data.fd = master_socket;
|
||||||
|
// Reading events with edge triggered mode
|
||||||
|
ev.events = EPOLLIN;// | EPOLLET;
|
||||||
|
|
||||||
|
// Allowing epoll to monitor the master_socket
|
||||||
|
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){
|
||||||
|
perror("epoll_ctl");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<int> sizes;
|
||||||
|
std::vector<uint64_t> tot_received_data;
|
||||||
|
std::vector<double> times;
|
||||||
|
|
||||||
|
int increment = 499;
|
||||||
|
|
||||||
|
for (int buf_size = 1; buf_size < 1e6 + 1; ) {
|
||||||
|
switch (buf_size) {
|
||||||
|
case 500:
|
||||||
|
increment = 500;
|
||||||
|
break;
|
||||||
|
case (int) 1e3:
|
||||||
|
increment = 1e3;
|
||||||
|
break;
|
||||||
|
case (int) 5e3:
|
||||||
|
increment = 5e3;
|
||||||
|
break;
|
||||||
|
case (int) 1e4:
|
||||||
|
increment = 1e4;
|
||||||
|
break;
|
||||||
|
case (int) 5e4:
|
||||||
|
increment = 5e4;
|
||||||
|
break;
|
||||||
|
case (int) 1e5:
|
||||||
|
increment = 1e5;
|
||||||
|
break;
|
||||||
|
case (int) 5e5:
|
||||||
|
increment = 5e5;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
printf("Next increment %d with current i: %d\n", increment, buf_size);
|
||||||
|
|
||||||
|
uint64_t bytes_read = 0;
|
||||||
|
uint64_t kBytes_read = 0;
|
||||||
|
double total_time_taken = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
// Time measurements
|
||||||
|
auto start = std::chrono::high_resolution_clock::now();
|
||||||
|
|
||||||
|
// Returns only the sockets for which there are events
|
||||||
|
//printf("Before wait\n");
|
||||||
|
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
|
||||||
|
//printf("After wait\n");
|
||||||
|
if (nfds == -1) {
|
||||||
|
perror("epoll_wait");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Iterate on the sockets having events
|
||||||
|
for (int i = 0; i < nfds; i++) {
|
||||||
|
//printf("Tot fds = %d reading from %d\n", nfds, i);
|
||||||
|
int fd = events[i].data.fd;
|
||||||
|
if (fd == master_socket) {
|
||||||
|
// If the activity is on the master socket, than it's a new connection request
|
||||||
|
acceptConnectionEpollStyle(master_socket, epoll_fd);
|
||||||
|
} else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) {
|
||||||
|
// Than the client connection is closed, so I close it
|
||||||
|
printf("Closing %d", fd);
|
||||||
|
close(fd);
|
||||||
|
} else {
|
||||||
|
// Than we received data from one of the monitored sockets
|
||||||
|
char buffer[buf_size];
|
||||||
|
int valread = 0;
|
||||||
|
//while (valread != EAGAIN) {
|
||||||
|
valread = recv(fd, &buffer, buf_size, 0);
|
||||||
|
if (valread > 0) {
|
||||||
|
//printf("[RICEVUTO]\t FROM %d\n", fd);
|
||||||
|
bytes_read += valread;
|
||||||
|
int kilos = 0;
|
||||||
|
if ((kilos = bytes_read / 1024) > 0) {
|
||||||
|
kBytes_read += kilos;
|
||||||
|
bytes_read -= (kilos * 1024);
|
||||||
|
//printf("reade bites %lu", bytes_read);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto end = std::chrono::high_resolution_clock::now();
|
||||||
|
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
|
||||||
|
//time taken in milliseconds
|
||||||
|
time_taken *= 1e-6;
|
||||||
|
total_time_taken += time_taken;
|
||||||
|
|
||||||
|
if (total_time_taken > 3e4) {
|
||||||
|
times.push_back(total_time_taken);
|
||||||
|
sizes.push_back(buf_size);
|
||||||
|
tot_received_data.push_back(kBytes_read);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buf_size += increment;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ofstream fout;
|
||||||
|
fout.open("epoll_data_stats_1000_TIMEOUT.csv");
|
||||||
|
//the time is in milliseconds and the data in kbytes
|
||||||
|
fout << "buffer_size;time;total_received_data;\n";
|
||||||
|
auto iter_sizes = sizes.begin();
|
||||||
|
auto iter_times = times.begin();
|
||||||
|
auto iter_data = tot_received_data.begin();
|
||||||
|
|
||||||
|
for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) {
|
||||||
|
fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
fout.close();
|
||||||
|
|
||||||
|
if (close(epoll_fd)) {
|
||||||
|
printf("Failed to close epoll file descriptor");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,301 @@
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <csignal>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <sys/select.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <sys/types.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
#include <sys/time.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
#include <vector>
|
||||||
|
#include <chrono>
|
||||||
|
#include <fstream>
|
||||||
|
#include <fcntl.h>
|
||||||
|
|
||||||
|
|
||||||
|
int makeSocket() {
|
||||||
|
int sockfd;
|
||||||
|
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||||
|
perror("socket failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
return sockfd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void bindSocketPort(int server_fd, int port) {
|
||||||
|
struct sockaddr_in localAddr;
|
||||||
|
localAddr.sin_family = AF_INET;
|
||||||
|
localAddr.sin_addr.s_addr = INADDR_ANY;
|
||||||
|
localAddr.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
|
||||||
|
perror("bind failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("FD %d bound to port %d\n", server_fd, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
void startListening(int server_fd) {
|
||||||
|
if (listen(server_fd, 1024) < 0) {
|
||||||
|
perror("listen");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
printf("FD %d listening to new connections\n", server_fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
int acceptConnection(int server_fd) {
|
||||||
|
int client_fd;
|
||||||
|
struct sockaddr_in remoteAddr;
|
||||||
|
|
||||||
|
size_t addrlen = sizeof(remoteAddr);
|
||||||
|
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
|
||||||
|
perror("accept");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
} else {
|
||||||
|
int flags = fcntl(client_fd, F_GETFL);
|
||||||
|
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
}
|
||||||
|
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
|
||||||
|
return client_fd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void term_handler(int signal) {
|
||||||
|
printf("Terminated, received SIGNAL %d", signal);
|
||||||
|
exit(EXIT_SUCCESS);
|
||||||
|
}
|
||||||
|
|
||||||
|
#define TRUE 1
|
||||||
|
#define FALSE 0
|
||||||
|
|
||||||
|
int main(int argc, char const *argv[]) {
|
||||||
|
|
||||||
|
signal(SIGTERM, term_handler);
|
||||||
|
|
||||||
|
if (argc != 2) {
|
||||||
|
printf("Usage: %s portNumber \n", argv[0]);
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
int port = atoi(argv[1]);
|
||||||
|
printf("Start socket port %d\n", port);
|
||||||
|
|
||||||
|
int opt = TRUE;
|
||||||
|
int master_socket , addrlen , new_socket , client_socket[1024] ,
|
||||||
|
max_clients = 1024 , activity, i , valread , sd;
|
||||||
|
int max_sd;
|
||||||
|
|
||||||
|
//set of socket descriptors
|
||||||
|
fd_set readfds;
|
||||||
|
|
||||||
|
//initialise all client_socket[] to 0 so not checked
|
||||||
|
for (i = 0; i < max_clients; i++)
|
||||||
|
{
|
||||||
|
client_socket[i] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
master_socket = makeSocket();
|
||||||
|
|
||||||
|
//set master socket to allow multiple connections ,
|
||||||
|
//this is just a good habit, it will work without this
|
||||||
|
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
|
||||||
|
sizeof(opt)) < 0 )
|
||||||
|
{
|
||||||
|
perror("setsockopt");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
bindSocketPort(master_socket, port);
|
||||||
|
startListening(master_socket);
|
||||||
|
|
||||||
|
int flags = fcntl(master_socket, F_GETFL, 0);
|
||||||
|
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
|
||||||
|
std::vector<int> sizes;
|
||||||
|
std::vector<uint64_t> tot_received_data;
|
||||||
|
std::vector<double> times;
|
||||||
|
|
||||||
|
int increment = 499;
|
||||||
|
|
||||||
|
for (int kikko = 1; kikko < 1e6 + 1;) {
|
||||||
|
switch (kikko) {
|
||||||
|
case 500:
|
||||||
|
increment = 500;
|
||||||
|
break;
|
||||||
|
case (int) 1e3:
|
||||||
|
increment = 1e3;
|
||||||
|
break;
|
||||||
|
case (int) 5e3:
|
||||||
|
increment = 5e3;
|
||||||
|
break;
|
||||||
|
case (int) 1e4:
|
||||||
|
increment = 1e4;
|
||||||
|
break;
|
||||||
|
case (int) 5e4:
|
||||||
|
increment = 5e4;
|
||||||
|
break;
|
||||||
|
case (int) 1e5:
|
||||||
|
increment = 1e5;
|
||||||
|
break;
|
||||||
|
case (int) 5e5:
|
||||||
|
increment = 5e5;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
printf("Next increment %d with current i: %d\n", increment, kikko);
|
||||||
|
|
||||||
|
uint64_t bytes_read = 0;
|
||||||
|
uint64_t kBytes_read = 0;
|
||||||
|
double total_time_taken = 0;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
auto start = std::chrono::high_resolution_clock::now();
|
||||||
|
//clear the socket set
|
||||||
|
FD_ZERO(&readfds);
|
||||||
|
|
||||||
|
//add master socket to set
|
||||||
|
FD_SET(master_socket, &readfds);
|
||||||
|
max_sd = master_socket;
|
||||||
|
|
||||||
|
//add child sockets to set
|
||||||
|
for ( i = 0 ; i < max_clients ; i++)
|
||||||
|
{
|
||||||
|
//socket descriptor
|
||||||
|
sd = client_socket[i];
|
||||||
|
|
||||||
|
//if valid socket descriptor then add to read list
|
||||||
|
if(sd > 0)
|
||||||
|
FD_SET( sd , &readfds);
|
||||||
|
|
||||||
|
//highest file descriptor number, need it for the select function
|
||||||
|
if(sd > max_sd)
|
||||||
|
max_sd = sd;
|
||||||
|
}
|
||||||
|
|
||||||
|
//wait for an activity on one of the sockets , timeout is NULL ,
|
||||||
|
//so wait indefinitely
|
||||||
|
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
|
||||||
|
|
||||||
|
if ((activity < 0) && (errno!=EINTR))
|
||||||
|
{
|
||||||
|
printf("select error");
|
||||||
|
}
|
||||||
|
|
||||||
|
//If something happened on the master socket ,
|
||||||
|
//then its an incoming connection
|
||||||
|
if (FD_ISSET(master_socket, &readfds))
|
||||||
|
{
|
||||||
|
new_socket = acceptConnection(master_socket);
|
||||||
|
|
||||||
|
//add new socket to array of sockets
|
||||||
|
for (i = 0; i < max_clients; i++)
|
||||||
|
{
|
||||||
|
//if position is empty
|
||||||
|
if( client_socket[i] == 0 )
|
||||||
|
{
|
||||||
|
client_socket[i] = new_socket;
|
||||||
|
printf("Adding to list of sockets as %d\n" , i);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//else its some IO operation on some other socket
|
||||||
|
for (i = 0; i < max_clients; i++)
|
||||||
|
{
|
||||||
|
sd = client_socket[i];
|
||||||
|
|
||||||
|
if (FD_ISSET( sd , &readfds))
|
||||||
|
{
|
||||||
|
//Check if it was for closing , and also read the
|
||||||
|
//incoming message
|
||||||
|
char buffer[kikko];
|
||||||
|
/*if ((valread = recv( sd , &buffer, kikko, 0)) >= 0)
|
||||||
|
{
|
||||||
|
printf("[RICEVUTO]\t FROM %d valread: %d\n", sd, valread);
|
||||||
|
bytes_read += valread;
|
||||||
|
int kilos = 0;
|
||||||
|
if ((kilos = bytes_read / 1024) > 0) {
|
||||||
|
kBytes_read += kilos;
|
||||||
|
bytes_read -= (kilos * 1024);
|
||||||
|
//printf("reade bites %lu", bytes_read);
|
||||||
|
}
|
||||||
|
}*/
|
||||||
|
|
||||||
|
if ((valread = recv( sd , &buffer, kikko, 0)) == 0)
|
||||||
|
{
|
||||||
|
struct sockaddr_in address;
|
||||||
|
int addrlen;
|
||||||
|
//Somebody disconnected , get his details and print
|
||||||
|
getpeername(sd , (struct sockaddr*)&address , \
|
||||||
|
(socklen_t*)&addrlen);
|
||||||
|
printf("Host disconnected , ip %s , port %d \n" ,
|
||||||
|
inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
|
||||||
|
|
||||||
|
printf("Disconnected fd %d", sd);
|
||||||
|
|
||||||
|
//Close the socket and mark as 0 in list for reuse
|
||||||
|
close( sd );
|
||||||
|
client_socket[i] = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
//Echo back the message that came in
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (valread > 0) {
|
||||||
|
//printf("[RICEVUTO]\t FROM %d\n", sd);
|
||||||
|
bytes_read += valread;
|
||||||
|
int kilos = 0;
|
||||||
|
if ((kilos = bytes_read / 1024) > 0) {
|
||||||
|
kBytes_read += kilos;
|
||||||
|
bytes_read -= (kilos * 1024);
|
||||||
|
//printf("reade bites %lu", bytes_read);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
auto end = std::chrono::high_resolution_clock::now();
|
||||||
|
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
|
||||||
|
//time taken in milliseconds
|
||||||
|
time_taken *= 1e-6;
|
||||||
|
total_time_taken += time_taken;
|
||||||
|
|
||||||
|
if (total_time_taken > 3e4) {
|
||||||
|
times.push_back(total_time_taken);
|
||||||
|
sizes.push_back(kikko);
|
||||||
|
tot_received_data.push_back(kBytes_read);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
//sleep(3);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
kikko += increment;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
std::ofstream fout;
|
||||||
|
fout.open("select_data_stats_1000desc_NOBLOCK_TIMEOUT_HARD.csv");
|
||||||
|
//the time is in milliseconds and the data in kbytes
|
||||||
|
fout << "buffer_size;time;total_received_data;\n";
|
||||||
|
auto iter_sizes = sizes.begin();
|
||||||
|
auto iter_times = times.begin();
|
||||||
|
auto iter_data = tot_received_data.begin();
|
||||||
|
|
||||||
|
for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) {
|
||||||
|
fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n";
|
||||||
|
}
|
||||||
|
|
||||||
|
fout.close();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,56 @@
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstring>
|
||||||
|
|
||||||
|
#define START_OF_HEADER_MARKER 0xee1234ee
|
||||||
|
|
||||||
|
typedef struct Header {
|
||||||
|
uint32_t startOfHeaderMarker = START_OF_HEADER_MARKER;
|
||||||
|
uint32_t headerSize;
|
||||||
|
uint32_t fragmentSize;
|
||||||
|
uint32_t sourceIdentifier;
|
||||||
|
uint32_t runNumber;
|
||||||
|
uint32_t detectorEventNumber;
|
||||||
|
uint32_t numberOfStatusElements;
|
||||||
|
uint32_t *statusElementsArray;
|
||||||
|
/*friend std::ostream& operator <<(std::ostream& os, Header const& header)
|
||||||
|
{
|
||||||
|
return os << std::setw(8) << std::setfill('0') << header.startOfHeaderMarker << '\n'
|
||||||
|
<< std::setw(8) << std::setfill('0') << header.headerSize << '\n'
|
||||||
|
<< std::setw(8) << std::setfill('0') << header.fragmentSize << '\n'
|
||||||
|
<< std::setw(8) << std::setfill('0') << header.runNumber << '\n'
|
||||||
|
<< std::setw(8) << std::setfill('0') << header.detectorEventNumber << '\n'
|
||||||
|
<< std::setw(8) << std::setfill('0') << header.numberOfStatusElements << '\n';
|
||||||
|
}*/
|
||||||
|
} Header;
|
||||||
|
|
||||||
|
typedef struct Fragment {
|
||||||
|
Header header;
|
||||||
|
uint32_t *payloadElements;
|
||||||
|
} Fragment;
|
||||||
|
|
||||||
|
enum ERROR_CODES {
|
||||||
|
INCORRECT_ERROR = (1 << 0),
|
||||||
|
CORRUPTED_ERROR = (1 << 1),
|
||||||
|
MISSING_DATA_ERROR = (1 << 2),
|
||||||
|
TIMEOUT_ERROR = (1 << 3)
|
||||||
|
};
|
||||||
|
|
||||||
|
void encode_header(uint32_t *buffer, const Header &header) {
|
||||||
|
buffer[0] = header.startOfHeaderMarker;
|
||||||
|
buffer[1] = header.headerSize;
|
||||||
|
buffer[2] = header.fragmentSize;
|
||||||
|
buffer[3] = header.sourceIdentifier;
|
||||||
|
buffer[4] = header.runNumber;
|
||||||
|
buffer[5] = header.detectorEventNumber;
|
||||||
|
buffer[6] = header.numberOfStatusElements;
|
||||||
|
|
||||||
|
std::memcpy(&buffer[7], header.statusElementsArray, header.numberOfStatusElements * sizeof(uint32_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
void encode_fragment(uint32_t *buffer, const Fragment &fragment){
|
||||||
|
encode_header(buffer, fragment.header);
|
||||||
|
std::memcpy(&buffer[fragment.header.headerSize], fragment.payloadElements, (fragment.header.fragmentSize - fragment.header.headerSize) * sizeof(uint32_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
# Important notes on why to use level triggered mode in this scenario
|
||||||
|
For monitoring multiple clients sending data frequently, you should generally use level-triggered mode with epoll() for simplicity and reliability.
|
||||||
|
|
||||||
|
Level-Triggered Mode: In this mode, the epoll_wait() function returns whenever there is data available to read or write on a socket, as long as the socket remains ready. You use recv() or send() to read or write the data. This mode is more suitable for handling frequent data transmission, as the socket remains in the ready state until all available data is processed.
|
||||||
|
|
||||||
|
Edge-Triggered Mode: In this mode, the epoll_wait() function returns only when the socket transitions from not ready to ready. You need to process data until EAGAIN is received, at which point you should exit the loop and re-register the socket with EPOLLONESHOT to be notified again when the socket becomes ready again. This mode requires more careful management, as you must handle the data until no more is available, and it can lead to more complex code.
|
||||||
|
|
||||||
|
Given your description of continuous data transmission causing the application to hang in edge-triggered mode, and considering your preference for handling frequent data transmission, level-triggered mode seems like the better choice. It allows you to focus on processing data without worrying about handling the edge conditions and re-registering the socket.
|
||||||
|
|
||||||
|
From a theoretical standpoint:
|
||||||
|
|
||||||
|
Edge Triggered: This mode provides more fine-grained control and can potentially be more efficient, especially in scenarios where data arrival might be infrequent and you want to be notified only when there's a change in socket readiness.
|
||||||
|
|
||||||
|
Level Triggered: This mode simplifies handling continuous data flow, as the socket remains in the ready state as long as data is available, which is often more suitable for high-frequency data streams.
|
||||||
|
|
||||||
|
Ultimately, your choice should depend on the nature of your application and how you want to manage the data flow. While edge-triggered mode can be used with careful handling, level-triggered mode is generally simpler and more straightforward for frequent data transmission scenarios.
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
Exactly, you've got it right! In scenarios where you need to read and buffer all available data for later processing, level-triggered mode is the better choice. It ensures that you'll be continuously notified as long as there's data available to read, allowing you to consume and buffer the data without worrying about missing any part of it.
|
||||||
|
|
||||||
|
Level-triggered mode is particularly well-suited for scenarios where you want to handle data consumption and processing at your own pace, as the socket remains in the ready state as long as data is available, ensuring that you have the opportunity to read all available data before continuing with other tasks.
|
||||||
|
|
||||||
|
So, for your specific use case of reading all the data available and buffering it for later use, level-triggered mode with epoll() is a solid choice.
|
||||||
|
|
||||||
|
Yes, your approach of using one thread for epoll-based socket monitoring and another thread for data analysis using thread-safe queues is a valid and common way to handle this kind of scenario. Here's a high-level overview of how your setup would work:
|
||||||
|
|
||||||
|
Epoll Thread: This thread will use epoll() with level-triggered mode to monitor multiple sockets for incoming data. Whenever data is available to read on a socket, the epoll thread can read the data and enqueue it into the thread-safe queue.
|
||||||
|
|
||||||
|
Data Analysis Thread: This thread will be responsible for dequeuing the buffered data from the thread-safe queue and performing the necessary analysis on it. Since the queue is thread-safe, you won't have to worry about synchronization issues between the two threads.
|
||||||
|
|
||||||
|
This separation of concerns allows you to isolate the low-level socket I/O and buffering logic from the data analysis logic, making your application more modular and maintainable.
|
||||||
|
|
||||||
|
Keep in mind a few considerations:
|
||||||
|
|
||||||
|
Concurrency: While using multiple threads can improve parallelism, it also introduces complexity. Ensure that you handle thread synchronization and communication properly to avoid race conditions or deadlocks.
|
||||||
|
|
||||||
|
Resource Management: Manage your data buffers carefully to avoid excessive memory consumption, especially if data arrives rapidly. Consider setting limits on the size of your buffers and implementing buffer recycling mechanisms if needed.
|
||||||
|
|
||||||
|
Thread Safety: Make sure the data analysis thread doesn't access or modify the same data buffers that the epoll thread is using. This is where your thread-safe queue comes in handy, as it provides a clean boundary between the two threads.
|
||||||
|
|
||||||
|
Error Handling: Implement robust error handling mechanisms to gracefully handle unexpected situations, such as socket errors or issues with the thread-safe queue.
|
||||||
|
|
||||||
|
Overall, your proposed approach seems reasonable and can be an effective way to achieve your goal of efficiently buffering data for later analysis while utilizing multi-threading for improved performance.
|
||||||
|
|
||||||
|
|
Binary file not shown.
|
@ -0,0 +1,80 @@
|
||||||
|
#include <algorithm>
|
||||||
|
#include <arpa/inet.h>
|
||||||
|
#include <cstddef>
|
||||||
|
#include <cstdint>
|
||||||
|
#include <cstdio>
|
||||||
|
#include <cstdlib>
|
||||||
|
#include <cstring>
|
||||||
|
#include <ctime>
|
||||||
|
#include <netinet/in.h>
|
||||||
|
#include <sys/socket.h>
|
||||||
|
#include <csignal>
|
||||||
|
#include <iostream>
|
||||||
|
#include <iomanip>
|
||||||
|
#include <tuple>
|
||||||
|
#include <unistd.h>
|
||||||
|
|
||||||
|
|
||||||
|
int makeSocket() {
|
||||||
|
int sockfd;
|
||||||
|
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
|
||||||
|
perror("socket failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
return sockfd;
|
||||||
|
}
|
||||||
|
|
||||||
|
void connectTo(int sock, const char* host, int port) {
|
||||||
|
struct sockaddr_in serv_addr;
|
||||||
|
serv_addr.sin_family = AF_INET;
|
||||||
|
serv_addr.sin_addr.s_addr = inet_addr(host);
|
||||||
|
serv_addr.sin_port = htons(port);
|
||||||
|
|
||||||
|
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
|
||||||
|
perror("Connection failed");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
printf("Connected to %s: %d\n", host, port);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char* argv[]) {
|
||||||
|
|
||||||
|
if (argc != 2) {
|
||||||
|
printf("Usage: ./prov.out timeout (ms)");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
}
|
||||||
|
|
||||||
|
int timeout = atoi(argv[1]);
|
||||||
|
//printf("Selected timeout: %f", static_cast<double>(timeout) / 1000);
|
||||||
|
|
||||||
|
int socket = makeSocket();
|
||||||
|
connectTo(socket, "127.0.0.1", 7777);
|
||||||
|
|
||||||
|
//allocating 100 megabytes of memory
|
||||||
|
uint64_t* chunky_boy = new uint64_t[67108];
|
||||||
|
size_t chunky_boy_size = 67108 * sizeof(uint64_t);
|
||||||
|
printf("chonky size %d", static_cast<int>(chunky_boy_size));
|
||||||
|
|
||||||
|
//setting memory to verify non-emptyness
|
||||||
|
memset(chunky_boy, 45678, chunky_boy_size);
|
||||||
|
|
||||||
|
int buffer_size = 1024 * 32;
|
||||||
|
|
||||||
|
|
||||||
|
for ( ;;) {
|
||||||
|
|
||||||
|
for ( int j = 0; j < chunky_boy_size; ) {
|
||||||
|
|
||||||
|
ssize_t bytes = send(socket, reinterpret_cast<char*>(chunky_boy) + j, std::min(static_cast<int>(chunky_boy_size) - j, buffer_size), 0);
|
||||||
|
if (timeout != 0) {
|
||||||
|
sleep(static_cast<double>(timeout) / 1000);
|
||||||
|
}
|
||||||
|
j += buffer_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,17 @@
|
||||||
|
this test wants to see how much data is possible to gather in 60 seconds with select and epoll.
|
||||||
|
For that reason, I'm removing the random time between provider data generation and send it asap.
|
||||||
|
I'll spawn 50 clients and let them connect and send data as fast as they can while I measure total received data on server side.
|
||||||
|
I'm ignoring all bottlenecks for now. All tests of this section are brought out on my local machine.
|
||||||
|
|
||||||
|
EDIT: by default all clients allocate a massive 100 megabytes of space to send big chunks of data.
|
||||||
|
To test with a bigger number of descriptors (I'm making 100, 500, 1000, 5000, 10000) i'm lowering the chunk size as well as changing
|
||||||
|
the generation method of the data to send.
|
||||||
|
So trashing first result with 50 and re executing with all the different clien numbers.
|
||||||
|
|
||||||
|
|
||||||
|
since no big performance difference (as expected, since with all clients active also epoll has to iterate through everything since it gets notified from all),
|
||||||
|
retesting with timeout on majority of clients. files with *_TIMEOUT.csv have only 50 clients without timeout, files with *TIMEOUT_HARD.csv have only 2 clients.
|
||||||
|
The timeout is pretty hard, we're talking 1 second, to hilight the difference in performance.
|
||||||
|
With epoll we see a big rise in throughput with the 2 clients, since we kill cpu time for iteration and optimize the 30 seconds time analysis,
|
||||||
|
in the 50 clients it's not so evident. With 50 we start to see already big improvements with epoll vs select.
|
||||||
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30001;31734;
|
||||||
|
500;30000.3;5573836;
|
||||||
|
1000;30000;8694627;
|
||||||
|
2000;30000.1;14935890;
|
||||||
|
3000;30000.1;21792013;
|
||||||
|
4000;30000.1;24816724;
|
||||||
|
5000;30000.1;28805466;
|
||||||
|
10000;30000;46161885;
|
||||||
|
20000;30000.6;61985281;
|
||||||
|
30000;30000.9;60864280;
|
||||||
|
40000;30000.6;70116721;
|
||||||
|
50000;30000.4;79858909;
|
||||||
|
100000;30001.1;77035845;
|
||||||
|
200000;30001.2;98608909;
|
||||||
|
300000;30001.6;111016135;
|
||||||
|
400000;30002.2;109526878;
|
||||||
|
500000;30005.3;129750689;
|
||||||
|
1000000;30005;101069846;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000.1;30805;
|
||||||
|
500;30000;1887572;
|
||||||
|
1000;30000;1390681;
|
||||||
|
2000;30000;1886540;
|
||||||
|
3000;30000;2337091;
|
||||||
|
4000;30000;2781395;
|
||||||
|
5000;30000;3236316;
|
||||||
|
10000;30000.1;5596495;
|
||||||
|
20000;30000;9857822;
|
||||||
|
30000;30000;13786084;
|
||||||
|
40000;30000.2;15183408;
|
||||||
|
50000;30000.5;16604221;
|
||||||
|
100000;30000.1;25196651;
|
||||||
|
200000;30000.2;44064204;
|
||||||
|
300000;30000;47852732;
|
||||||
|
400000;30000.4;51052155;
|
||||||
|
500000;30000.6;50078987;
|
||||||
|
1000000;30000.3;70992968;
|
|
|
@ -0,0 +1,15 @@
|
||||||
|
#!/bin/bash
|
||||||
|
echo "Usage: $0 host port runNumber numberOfProviders"
|
||||||
|
if [ $# -eq 1 ]
|
||||||
|
then
|
||||||
|
for i in $(seq 1 $1);
|
||||||
|
do
|
||||||
|
echo "Spawning provider number $i"
|
||||||
|
if [ $i -le 2 ]
|
||||||
|
then
|
||||||
|
./prov.out 0&
|
||||||
|
else
|
||||||
|
./prov.out 1000&
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
fi
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
500;30000;923066;
|
||||||
|
500;30000;1246851;
|
||||||
|
500;30000;1224399;
|
||||||
|
500;30000;1236942;
|
||||||
|
500;30000;1237006;
|
||||||
|
500;30000;1240083;
|
||||||
|
500;30000;1243543;
|
||||||
|
500;30000;1247446;
|
||||||
|
500;30000;1249498;
|
||||||
|
500;30000;1242588;
|
||||||
|
500;30000;1246731;
|
||||||
|
500;30000;1249731;
|
||||||
|
500;30000;1245551;
|
||||||
|
500;30000;1251264;
|
||||||
|
500;30000;1257202;
|
||||||
|
500;30000;1253106;
|
||||||
|
500;30000;1250601;
|
||||||
|
500;30000;1249947;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000.5;30019;
|
||||||
|
500;30000.3;12629492;
|
||||||
|
1000;30003.2;20228484;
|
||||||
|
2000;30000.9;31157526;
|
||||||
|
3000;30000.5;38823216;
|
||||||
|
4000;30000.6;44107121;
|
||||||
|
5000;30000;45085442;
|
||||||
|
10000;30000.1;60564704;
|
||||||
|
20000;30001.2;72663492;
|
||||||
|
30000;30005.5;72002012;
|
||||||
|
40000;30000.2;77138855;
|
||||||
|
50000;30001.6;88049703;
|
||||||
|
100000;30005.6;101391654;
|
||||||
|
200000;30001.9;99283463;
|
||||||
|
300000;30014.1;91602483;
|
||||||
|
400000;30004.2;113396854;
|
||||||
|
500000;30008.1;112980666;
|
||||||
|
1000000;30015.8;105931378;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000;42850;
|
||||||
|
500;30000.1;15883661;
|
||||||
|
1000;30000;27186324;
|
||||||
|
2000;30000.2;41545982;
|
||||||
|
3000;30000.4;47881778;
|
||||||
|
4000;30004.2;54284932;
|
||||||
|
5000;30006.1;55478462;
|
||||||
|
10000;30000.1;95230218;
|
||||||
|
20000;30000.2;105701171;
|
||||||
|
30000;30000;103435546;
|
||||||
|
40000;30000.9;114492187;
|
||||||
|
50000;30000.7;112895507;
|
||||||
|
100000;30001;113681640;
|
||||||
|
200000;30000.6;113457031;
|
||||||
|
300000;30002.4;128701171;
|
||||||
|
400000;30002.3;129140625;
|
||||||
|
500000;30014.2;133642578;
|
||||||
|
1000000;30022;124511718;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000;39454;
|
||||||
|
500;30000.1;16157269;
|
||||||
|
1000;30000;27153432;
|
||||||
|
2000;30000.1;40100085;
|
||||||
|
3000;30000;48828124;
|
||||||
|
4000;30000.1;55680401;
|
||||||
|
5000;30000;61979270;
|
||||||
|
10000;30000.3;98380003;
|
||||||
|
20000;30000.1;106611328;
|
||||||
|
30000;30000.7;107302734;
|
||||||
|
40000;30000.2;101027343;
|
||||||
|
50000;30000;111542968;
|
||||||
|
100000;30001;102851562;
|
||||||
|
200000;30004.2;118359375;
|
||||||
|
300000;30003;122314453;
|
||||||
|
400000;30008.9;118750000;
|
||||||
|
500000;30009.5;118750000;
|
||||||
|
1000000;30020.8;117871093;
|
|
|
@ -0,0 +1,19 @@
|
||||||
|
buffer_size;time;total_received_data;
|
||||||
|
1;30000.2;31884;
|
||||||
|
500;30000.2;14397128;
|
||||||
|
1000;30000.4;23282151;
|
||||||
|
2000;30001;34224109;
|
||||||
|
3000;30000.5;42075605;
|
||||||
|
4000;30000;47592080;
|
||||||
|
5000;30000.5;50634489;
|
||||||
|
10000;30002.2;62469144;
|
||||||
|
20000;30013.4;65980444;
|
||||||
|
30000;30003.8;66720000;
|
||||||
|
40000;30005.9;74623936;
|
||||||
|
50000;30001.5;103636705;
|
||||||
|
100000;30002.9;111512675;
|
||||||
|
200000;30002.2;107186640;
|
||||||
|
300000;30013.8;111613113;
|
||||||
|
400000;30005.9;105724495;
|
||||||
|
500000;30008.1;106815424;
|
||||||
|
1000000;30037.6;108984396;
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue