diff --git a/testing/connection_method/readme.txt b/testing/connection_method/readme.txt index be42d79..0ebc290 100644 --- a/testing/connection_method/readme.txt +++ b/testing/connection_method/readme.txt @@ -15,3 +15,5 @@ The timeout is pretty hard, we're talking 1 second, to hilight the difference in With epoll we see a big rise in throughput with the 2 clients, since we kill cpu time for iteration and optimize the 30 seconds time analysis, in the 50 clients it's not so evident. With 50 we start to see already big improvements with epoll vs select. +Now I try to multithread epoll, to see if the data acquired increases. I'll measure the full bandwith received, I expect a semi-linear increase in performance. +For the multithreading thread check the threads folder \ No newline at end of file diff --git a/testing/connection_method/epoll_data_stats_100.csv b/testing/results/epoll_data_stats_100.csv similarity index 100% rename from testing/connection_method/epoll_data_stats_100.csv rename to testing/results/epoll_data_stats_100.csv diff --git a/testing/connection_method/epoll_data_stats_1000.csv b/testing/results/epoll_data_stats_1000.csv similarity index 100% rename from testing/connection_method/epoll_data_stats_1000.csv rename to testing/results/epoll_data_stats_1000.csv diff --git a/testing/connection_method/epoll_data_stats_1000_TIMEOUT.csv b/testing/results/epoll_data_stats_1000_TIMEOUT.csv similarity index 100% rename from testing/connection_method/epoll_data_stats_1000_TIMEOUT.csv rename to testing/results/epoll_data_stats_1000_TIMEOUT.csv diff --git a/testing/connection_method/epoll_data_stats_1000_TIMEOUT_HARD.csv b/testing/results/epoll_data_stats_1000_TIMEOUT_HARD.csv similarity index 100% rename from testing/connection_method/epoll_data_stats_1000_TIMEOUT_HARD.csv rename to testing/results/epoll_data_stats_1000_TIMEOUT_HARD.csv diff --git a/testing/results/epoll_data_stats_1000_multith.csv b/testing/results/epoll_data_stats_1000_multith.csv new file mode 100644 index 0000000..147718c --- /dev/null +++ b/testing/results/epoll_data_stats_1000_multith.csv @@ -0,0 +1,19 @@ +buffer_size;time;total_received_data; +1;30001.8;121453; +500;30001.8;38315920; +1000;30013.8;60329152; +2000;30005.2;74660074; +3000;30005.9;85193274; +4000;30006.5;96597573; +5000;30008.5;106440822; +10000;30011.8;137147617; +20000;30007.9;176761989; +30000;30006.4;156482849; +40000;30014;177159351; +50000;30049.3;171875036; +100000;30053.9;224264206; +200000;30048.3;225628771; +300000;30121;180069213; +400000;30016.7;156804334; +500000;30000.6;230636978; +1000000;30135;159812471; diff --git a/testing/results/epoll_data_stats_1000_multith_TIMEOUT.csv b/testing/results/epoll_data_stats_1000_multith_TIMEOUT.csv new file mode 100644 index 0000000..cbcb9d5 --- /dev/null +++ b/testing/results/epoll_data_stats_1000_multith_TIMEOUT.csv @@ -0,0 +1,19 @@ +buffer_size;time;total_received_data; +1;30002.1;140629; +500;30003.8;31440572; +1000;30004.1;48545615; +2000;30010.9;65841163; +3000;30011.9;79191486; +4000;30008.6;83061533; +5000;30011.5;82064998; +10000;30005.2;96952555; +20000;30007;132912119; +30000;30005.2;138298731; +40000;30017.7;138156188; +50000;30017.2;139215949; +100000;30051.2;139854062; +200000;30008.9;146022695; +300000;30061.5;142890809; +400000;30083.1;138006324; +500000;30074.4;133424935; +1000000;30000.5;114451148; diff --git a/testing/results/epoll_data_stats_1000_multith_TIMEOUT_HARD.csv b/testing/results/epoll_data_stats_1000_multith_TIMEOUT_HARD.csv new file mode 100644 index 0000000..0b7500d --- /dev/null +++ b/testing/results/epoll_data_stats_1000_multith_TIMEOUT_HARD.csv @@ -0,0 +1,19 @@ +buffer_size;time;total_received_data; +1;30002.1;163986; +500;30019;34088098; +1000;30005.1;50801994; +2000;30006.2;69301651; +3000;30005.4;80399265; +4000;30012.7;90123946; +5000;30010.8;98647263; +10000;30011.7;116325174; +20000;30049.7;134726431; +30000;30025;144562322; +40000;30028.7;147823821; +50000;30012.3;146218628; +100000;30069.2;151376815; +200000;30011.5;138831476; +300000;30022.2;145632228; +400000;30096.8;146447339; +500000;30143.2;134063359; +1000000;30000.4;123787063; diff --git a/testing/connection_method/epoll_data_stats_500.csv b/testing/results/epoll_data_stats_500.csv similarity index 100% rename from testing/connection_method/epoll_data_stats_500.csv rename to testing/results/epoll_data_stats_500.csv diff --git a/testing/connection_method/select_data_stats_1000desc_NOBLOCK_TIMEOUT.csv b/testing/results/select_data_stats_1000desc_NOBLOCK_TIMEOUT.csv similarity index 100% rename from testing/connection_method/select_data_stats_1000desc_NOBLOCK_TIMEOUT.csv rename to testing/results/select_data_stats_1000desc_NOBLOCK_TIMEOUT.csv diff --git a/testing/connection_method/select_data_stats_1000desc_NOBLOCK_TIMEOUT_HARD.csv b/testing/results/select_data_stats_1000desc_NOBLOCK_TIMEOUT_HARD.csv similarity index 100% rename from testing/connection_method/select_data_stats_1000desc_NOBLOCK_TIMEOUT_HARD.csv rename to testing/results/select_data_stats_1000desc_NOBLOCK_TIMEOUT_HARD.csv diff --git a/testing/threads/ev_build.out b/testing/threads/ev_build.out new file mode 100755 index 0000000..7c9cac0 Binary files /dev/null and b/testing/threads/ev_build.out differ diff --git a/testing/threads/event_builder_epoll_multhread.cxx b/testing/threads/event_builder_epoll_multhread.cxx new file mode 100644 index 0000000..aa8cc98 --- /dev/null +++ b/testing/threads/event_builder_epoll_multhread.cxx @@ -0,0 +1,351 @@ +#include +#include +#include +#include +#include +#include // for fprintf() +#include +#include +#include +#include // for close(), read() +#include // for epoll_create1(), epoll_ctl(), struct epoll_event +#include // for strncmp + +//my addition to the online guide +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +#define MAX_EVENTS 20000 + +int makeSocket() { + int sockfd; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + return sockfd; +} + +void bindSocketPort(int server_fd, int port) { + struct sockaddr_in localAddr; + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = INADDR_ANY; + localAddr.sin_port = htons(port); + + if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { + perror("bind failed"); + exit(EXIT_FAILURE); + } + printf("FD %d bound to port %d\n", server_fd, port); +} + +void startListening(int server_fd) { + if (listen(server_fd, 20000) < 0) { + perror("listen"); + exit(EXIT_FAILURE); + } + printf("FD %d listening to new connections\n", server_fd); +} + +int acceptConnection(int server_fd) { + int client_fd; + struct sockaddr_in remoteAddr; + + size_t addrlen = sizeof(remoteAddr); + if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) { + perror("accept"); + exit(EXIT_FAILURE); + } else { + int flags = fcntl(client_fd, F_GETFL); + fcntl(client_fd, F_SETFL, flags | O_NONBLOCK); + } + printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd); + return client_fd; +} + +void acceptConnectionEpollStyle(int server_fd, int &efd) { + struct sockaddr_in new_remoteAddr; + int addrlen = sizeof(struct sockaddr_in); + + while (true) { + int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen); + + if (conn_sock == -1) { + // All incoming connections have been processed + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + break; + } else { + perror("accept"); + break; + } + } + + // make new connection non-blocking + int flags = fcntl(conn_sock, F_GETFL, 0); + fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK); + + // monitor new connection for read events, always in edge triggered + struct epoll_event event; + event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET; + event.data.fd = conn_sock; + + // Allow epoll to monitor the new connection + if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) { + perror("epoll_ctl: conn_sock"); + break; + } + printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock); + + } +} + +void term_handler(int signal) { + printf("Terminated, received SIGNAL %d", signal); + exit(EXIT_SUCCESS); +} + +std::atomic grandtotal_kb; + + +void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_flag, const int thread_index) { + epoll_event events[MAX_EVENTS]; + uint64_t bytes_read = 0; + uint64_t kBytes_read = 0; + + while (true) { + if (th_flag == 1) { + grandtotal_kb += kBytes_read; + break; + } + // Time measurements + ///auto start = std::chrono::high_resolution_clock::now(); + + // Returns only the sockets for which there are events + //printf("Before wait\n"); + int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + //printf("After wait\n"); + if (nfds == -1) { + perror("epoll_wait"); + exit(EXIT_FAILURE); + } + + // Iterate on the sockets having events + for (int i = 0; i < nfds; i++) { + //printf("Tot fds = %d reading from %d\n", nfds, i); + int fd = events[i].data.fd; + if (fd == master_socket) { + // If the activity is on the master socket, than it's a new connection request + acceptConnectionEpollStyle(master_socket, epoll_fd); + + } else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { + // Than the client connection is closed, so I close it + printf("Closing %d", fd); + close(fd); + } else { + // Than we received data from one of the monitored sockets + char buffer[buf_size]; + int valread = 0; + //while (valread != EAGAIN) { + valread = recv(fd, &buffer, buf_size, 0); + if (valread > 0) { + //printf("[RICEVUTO]\t FROM %d\n", fd); + bytes_read += valread; + int kilos = 0; + if ((kilos = bytes_read / 1024) > 0) { + kBytes_read += kilos; + bytes_read -= (kilos * 1024); + //printf("reade bites %lu", bytes_read); + } + } + //} + + } + } + + ///auto end = std::chrono::high_resolution_clock::now(); + ///double time_taken = std::chrono::duration_cast(end - start).count(); + //time taken in milliseconds + /// + /*time_taken *= 1e-6; + total_time_taken += time_taken; + + if (total_time_taken > 3e4) { + times.push_back(total_time_taken); + tot_received_data.push_back(kBytes_read); + break; + }*/ + /// + } + +} + + +int main(int argc, char const *argv[]) { + + signal(SIGTERM, term_handler); + + + if (argc != 2) { + printf("Usage: %s portNumber \n", argv[0]); + exit(EXIT_FAILURE); + } + int port = atoi(argv[1]); + printf("Start socket port %d\n", port); + + int master_socket; + const int opt = 1; + + + master_socket = makeSocket(); + + //set master socket to allow multiple connections , + //this is just a good habit, it will work without this + if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, + sizeof(opt)) < 0 ) + { + perror("setsockopt"); + exit(EXIT_FAILURE); + } + + bindSocketPort(master_socket, port); + startListening(master_socket); + + int flags = fcntl(master_socket, F_GETFL, 0); + fcntl(master_socket, F_SETFL, flags | O_NONBLOCK); + + epoll_event ev, events[MAX_EVENTS]; + std::array mutex_array; + std::array kBytes_read_on_descr; + + //The atomic here is used as a flag to tell the thread to stop + std::vector vThreads; + + //create the epoll instance + int epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + printf("Failed to create epoll file descriptor\n"); + exit(EXIT_FAILURE); + } + + ev.data.fd = master_socket; + // Reading events with edge triggered mode + ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET; + + // Allowing epoll to monitor the master_socket + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){ + perror("epoll_ctl"); + exit(EXIT_FAILURE); + } + + std::vector sizes; + std::vector tot_received_data; + std::vector times; + + grandtotal_kb = 0; + int increment = 499; + + for (int buf_size = 1; buf_size < 1e6 + 1; ) { + switch (buf_size) { + case 500: + increment = 500; + break; + case (int) 1e3: + increment = 1e3; + break; + case (int) 5e3: + increment = 5e3; + break; + case (int) 1e4: + increment = 1e4; + break; + case (int) 5e4: + increment = 5e4; + break; + case (int) 1e5: + increment = 1e5; + break; + case (int) 5e5: + increment = 5e5; + break; + } + printf("Next increment %d with current i: %d\n", increment, buf_size); + + std::array vT; + std::array thread_flags; + + for (int t_i = 0; t_i < 4; t_i++) { + thread_flags[t_i] = 0; + vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), buf_size, std::cref(thread_flags.at(t_i)), t_i); + } + + std::string command = ""; + std::string ref("go"); + if ( increment == 499){ + while (command != "go") { + std::cout << "Insert command: "; + std::cin >> command; + std::cout << command << std::endl; + } + } + + std::cout << "starting measurement with current buf size: " << buf_size << std::endl; + + auto start = std::chrono::high_resolution_clock::now(); + grandtotal_kb = 0; + + sleep(30); + for (int t_i = 0; t_i < 4; t_i++) { + thread_flags[t_i] = 1; + vT[t_i].join(); + } + uint64_t local_kB = grandtotal_kb; + auto end = std::chrono::high_resolution_clock::now(); + double time_taken = std::chrono::duration_cast(end - start).count(); + //time taken in milliseconds + time_taken *= 1e-6; + + times.push_back(time_taken); + sizes.push_back(buf_size); + tot_received_data.push_back(local_kB); + + buf_size += increment; + } + + + + std::ofstream fout; + fout.open("epoll_data_stats_1000_multith_TIMEOUT.csv"); + //the time is in milliseconds and the data in kbytes + fout << "buffer_size;time;total_received_data;\n"; + auto iter_sizes = sizes.begin(); + auto iter_times = times.begin(); + auto iter_data = tot_received_data.begin(); + + for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) { + fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n"; + } + + fout.close(); + + if (close(epoll_fd)) { + printf("Failed to close epoll file descriptor"); + exit(EXIT_FAILURE); + } + + return 0; +} \ No newline at end of file diff --git a/testing/threads/prov.out b/testing/threads/prov.out new file mode 100755 index 0000000..47dc96b Binary files /dev/null and b/testing/threads/prov.out differ diff --git a/testing/threads/provider.cxx b/testing/threads/provider.cxx new file mode 100644 index 0000000..a7b2038 --- /dev/null +++ b/testing/threads/provider.cxx @@ -0,0 +1,82 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +int makeSocket() { + int sockfd; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + return sockfd; +} + +void connectTo(int sock, const char* host, int port) { + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr(host); + serv_addr.sin_port = htons(port); + + if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + perror("Connection failed"); + exit(EXIT_FAILURE); + } + + printf("Connected to %s: %d\n", host, port); +} + +int main(int argc, char* argv[]) { + + if (argc != 2) { + printf("Usage: ./prov.out timeout (ms)"); + exit(EXIT_FAILURE); + } + + int timeout = atoi(argv[1]); + //printf("Selected timeout: %f", static_cast(timeout) / 1000); + + int socket = makeSocket(); + connectTo(socket, "127.0.0.1", 7777); + + //allocating 100 megabytes of memory + uint64_t* chunky_boy = new uint64_t[67108]; + size_t chunky_boy_size = 67108 * sizeof(uint64_t); + printf("chonky size %d", static_cast(chunky_boy_size)); + + //setting memory to verify non-emptyness + memset(chunky_boy, 45678, chunky_boy_size); + + int buffer_size = 1024 * 32; + + + for ( ;;) { + + for ( int j = 0; j < chunky_boy_size; ) { + + ssize_t bytes = send(socket, reinterpret_cast(chunky_boy) + j, std::min(static_cast(chunky_boy_size) - j, buffer_size), 0); + if (timeout != 0) { + sleep(timeout); + } + j += buffer_size; + } + + //usleep(1000); + + } + + + return 0; +} \ No newline at end of file diff --git a/testing/threads/readme.md b/testing/threads/readme.md new file mode 100644 index 0000000..963c1e4 --- /dev/null +++ b/testing/threads/readme.md @@ -0,0 +1,18 @@ +# Multithreaded epoll() + +### Not a naïve implementation +To correctly set a multithreaded `epoll()` it's necessary to specifically set some flags. We'll consider level triggered mode since that's what we're using. Since we'll split the requests on different worker threads I expect the performance to scale semi-linearly with the number of workers. + +The provider implementation doesen't change. I'll spawn providers with no timeout. + +#### Results +With 4 threads we manage to see from ~1.5x to ~3x increase in performance without timeout. In prod we can maximize cpu cores using for read 8-n-1 threads, where 1 is for the main thread and n is the number of thrads used to build the full event (probably would be n=1). + +With timeout, I dont see improvement with the TIMEOUT_HARD (only 2 clients sending continuously), actually some worse perfomance in some cases. Prob this is due the EPOLLEXCLUSIVE which waste time distributing on more threads. +With the TIMEOUD (50 clients sending continuously) I get some improvement, but not comparable to the all continuous ones. That's probably because of the fact that i'm getting some sort of tradeoff between exec time in incoming connection and time needed to distribute load on threads. + +In an environment with a lot of clients and short timeouts, the approach with more than one thread is obviously preferrable for scalability reasons (50 cont + 950 1 sec timeout shows improvement already). + +In such enviroment, a multito avoid reordering.hread instance should use mutexes on read data to avoid reordering of received data. It's gonna be explained in detail during the presentations. + +(thats to study a little better) diff --git a/testing/threads/spawn_clients.sh b/testing/threads/spawn_clients.sh new file mode 100755 index 0000000..023b916 --- /dev/null +++ b/testing/threads/spawn_clients.sh @@ -0,0 +1,15 @@ +#!/bin/bash +echo "Usage: $0 host port runNumber numberOfProviders" +if [ $# -eq 1 ] +then + for i in $(seq 1 $1); + do + echo "Spawning provider number $i" + if [ $i -le 50 ] + then + ./prov.out 1& + else + ./prov.out 0& + fi + done +fi