diff --git a/testing/is_epoll_really_thsafe/event_builder_epoll_multhread.cxx b/testing/is_epoll_really_thsafe/event_builder_epoll_multhread.cxx new file mode 100644 index 0000000..aa8cc98 --- /dev/null +++ b/testing/is_epoll_really_thsafe/event_builder_epoll_multhread.cxx @@ -0,0 +1,351 @@ +#include +#include +#include +#include +#include +#include // for fprintf() +#include +#include +#include +#include // for close(), read() +#include // for epoll_create1(), epoll_ctl(), struct epoll_event +#include // for strncmp + +//my addition to the online guide +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + + +#define MAX_EVENTS 20000 + +int makeSocket() { + int sockfd; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + return sockfd; +} + +void bindSocketPort(int server_fd, int port) { + struct sockaddr_in localAddr; + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = INADDR_ANY; + localAddr.sin_port = htons(port); + + if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { + perror("bind failed"); + exit(EXIT_FAILURE); + } + printf("FD %d bound to port %d\n", server_fd, port); +} + +void startListening(int server_fd) { + if (listen(server_fd, 20000) < 0) { + perror("listen"); + exit(EXIT_FAILURE); + } + printf("FD %d listening to new connections\n", server_fd); +} + +int acceptConnection(int server_fd) { + int client_fd; + struct sockaddr_in remoteAddr; + + size_t addrlen = sizeof(remoteAddr); + if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) { + perror("accept"); + exit(EXIT_FAILURE); + } else { + int flags = fcntl(client_fd, F_GETFL); + fcntl(client_fd, F_SETFL, flags | O_NONBLOCK); + } + printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd); + return client_fd; +} + +void acceptConnectionEpollStyle(int server_fd, int &efd) { + struct sockaddr_in new_remoteAddr; + int addrlen = sizeof(struct sockaddr_in); + + while (true) { + int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen); + + if (conn_sock == -1) { + // All incoming connections have been processed + if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) { + break; + } else { + perror("accept"); + break; + } + } + + // make new connection non-blocking + int flags = fcntl(conn_sock, F_GETFL, 0); + fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK); + + // monitor new connection for read events, always in edge triggered + struct epoll_event event; + event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET; + event.data.fd = conn_sock; + + // Allow epoll to monitor the new connection + if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) { + perror("epoll_ctl: conn_sock"); + break; + } + printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock); + + } +} + +void term_handler(int signal) { + printf("Terminated, received SIGNAL %d", signal); + exit(EXIT_SUCCESS); +} + +std::atomic grandtotal_kb; + + +void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_flag, const int thread_index) { + epoll_event events[MAX_EVENTS]; + uint64_t bytes_read = 0; + uint64_t kBytes_read = 0; + + while (true) { + if (th_flag == 1) { + grandtotal_kb += kBytes_read; + break; + } + // Time measurements + ///auto start = std::chrono::high_resolution_clock::now(); + + // Returns only the sockets for which there are events + //printf("Before wait\n"); + int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1); + //printf("After wait\n"); + if (nfds == -1) { + perror("epoll_wait"); + exit(EXIT_FAILURE); + } + + // Iterate on the sockets having events + for (int i = 0; i < nfds; i++) { + //printf("Tot fds = %d reading from %d\n", nfds, i); + int fd = events[i].data.fd; + if (fd == master_socket) { + // If the activity is on the master socket, than it's a new connection request + acceptConnectionEpollStyle(master_socket, epoll_fd); + + } else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) { + // Than the client connection is closed, so I close it + printf("Closing %d", fd); + close(fd); + } else { + // Than we received data from one of the monitored sockets + char buffer[buf_size]; + int valread = 0; + //while (valread != EAGAIN) { + valread = recv(fd, &buffer, buf_size, 0); + if (valread > 0) { + //printf("[RICEVUTO]\t FROM %d\n", fd); + bytes_read += valread; + int kilos = 0; + if ((kilos = bytes_read / 1024) > 0) { + kBytes_read += kilos; + bytes_read -= (kilos * 1024); + //printf("reade bites %lu", bytes_read); + } + } + //} + + } + } + + ///auto end = std::chrono::high_resolution_clock::now(); + ///double time_taken = std::chrono::duration_cast(end - start).count(); + //time taken in milliseconds + /// + /*time_taken *= 1e-6; + total_time_taken += time_taken; + + if (total_time_taken > 3e4) { + times.push_back(total_time_taken); + tot_received_data.push_back(kBytes_read); + break; + }*/ + /// + } + +} + + +int main(int argc, char const *argv[]) { + + signal(SIGTERM, term_handler); + + + if (argc != 2) { + printf("Usage: %s portNumber \n", argv[0]); + exit(EXIT_FAILURE); + } + int port = atoi(argv[1]); + printf("Start socket port %d\n", port); + + int master_socket; + const int opt = 1; + + + master_socket = makeSocket(); + + //set master socket to allow multiple connections , + //this is just a good habit, it will work without this + if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, + sizeof(opt)) < 0 ) + { + perror("setsockopt"); + exit(EXIT_FAILURE); + } + + bindSocketPort(master_socket, port); + startListening(master_socket); + + int flags = fcntl(master_socket, F_GETFL, 0); + fcntl(master_socket, F_SETFL, flags | O_NONBLOCK); + + epoll_event ev, events[MAX_EVENTS]; + std::array mutex_array; + std::array kBytes_read_on_descr; + + //The atomic here is used as a flag to tell the thread to stop + std::vector vThreads; + + //create the epoll instance + int epoll_fd = epoll_create1(0); + if (epoll_fd == -1) { + printf("Failed to create epoll file descriptor\n"); + exit(EXIT_FAILURE); + } + + ev.data.fd = master_socket; + // Reading events with edge triggered mode + ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET; + + // Allowing epoll to monitor the master_socket + if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){ + perror("epoll_ctl"); + exit(EXIT_FAILURE); + } + + std::vector sizes; + std::vector tot_received_data; + std::vector times; + + grandtotal_kb = 0; + int increment = 499; + + for (int buf_size = 1; buf_size < 1e6 + 1; ) { + switch (buf_size) { + case 500: + increment = 500; + break; + case (int) 1e3: + increment = 1e3; + break; + case (int) 5e3: + increment = 5e3; + break; + case (int) 1e4: + increment = 1e4; + break; + case (int) 5e4: + increment = 5e4; + break; + case (int) 1e5: + increment = 1e5; + break; + case (int) 5e5: + increment = 5e5; + break; + } + printf("Next increment %d with current i: %d\n", increment, buf_size); + + std::array vT; + std::array thread_flags; + + for (int t_i = 0; t_i < 4; t_i++) { + thread_flags[t_i] = 0; + vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), buf_size, std::cref(thread_flags.at(t_i)), t_i); + } + + std::string command = ""; + std::string ref("go"); + if ( increment == 499){ + while (command != "go") { + std::cout << "Insert command: "; + std::cin >> command; + std::cout << command << std::endl; + } + } + + std::cout << "starting measurement with current buf size: " << buf_size << std::endl; + + auto start = std::chrono::high_resolution_clock::now(); + grandtotal_kb = 0; + + sleep(30); + for (int t_i = 0; t_i < 4; t_i++) { + thread_flags[t_i] = 1; + vT[t_i].join(); + } + uint64_t local_kB = grandtotal_kb; + auto end = std::chrono::high_resolution_clock::now(); + double time_taken = std::chrono::duration_cast(end - start).count(); + //time taken in milliseconds + time_taken *= 1e-6; + + times.push_back(time_taken); + sizes.push_back(buf_size); + tot_received_data.push_back(local_kB); + + buf_size += increment; + } + + + + std::ofstream fout; + fout.open("epoll_data_stats_1000_multith_TIMEOUT.csv"); + //the time is in milliseconds and the data in kbytes + fout << "buffer_size;time;total_received_data;\n"; + auto iter_sizes = sizes.begin(); + auto iter_times = times.begin(); + auto iter_data = tot_received_data.begin(); + + for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) { + fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n"; + } + + fout.close(); + + if (close(epoll_fd)) { + printf("Failed to close epoll file descriptor"); + exit(EXIT_FAILURE); + } + + return 0; +} \ No newline at end of file diff --git a/testing/is_epoll_really_thsafe/provider.cxx b/testing/is_epoll_really_thsafe/provider.cxx new file mode 100644 index 0000000..a7b2038 --- /dev/null +++ b/testing/is_epoll_really_thsafe/provider.cxx @@ -0,0 +1,82 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +int makeSocket() { + int sockfd; + if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { + perror("socket failed"); + exit(EXIT_FAILURE); + } + return sockfd; +} + +void connectTo(int sock, const char* host, int port) { + struct sockaddr_in serv_addr; + serv_addr.sin_family = AF_INET; + serv_addr.sin_addr.s_addr = inet_addr(host); + serv_addr.sin_port = htons(port); + + if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) { + perror("Connection failed"); + exit(EXIT_FAILURE); + } + + printf("Connected to %s: %d\n", host, port); +} + +int main(int argc, char* argv[]) { + + if (argc != 2) { + printf("Usage: ./prov.out timeout (ms)"); + exit(EXIT_FAILURE); + } + + int timeout = atoi(argv[1]); + //printf("Selected timeout: %f", static_cast(timeout) / 1000); + + int socket = makeSocket(); + connectTo(socket, "127.0.0.1", 7777); + + //allocating 100 megabytes of memory + uint64_t* chunky_boy = new uint64_t[67108]; + size_t chunky_boy_size = 67108 * sizeof(uint64_t); + printf("chonky size %d", static_cast(chunky_boy_size)); + + //setting memory to verify non-emptyness + memset(chunky_boy, 45678, chunky_boy_size); + + int buffer_size = 1024 * 32; + + + for ( ;;) { + + for ( int j = 0; j < chunky_boy_size; ) { + + ssize_t bytes = send(socket, reinterpret_cast(chunky_boy) + j, std::min(static_cast(chunky_boy_size) - j, buffer_size), 0); + if (timeout != 0) { + sleep(timeout); + } + j += buffer_size; + } + + //usleep(1000); + + } + + + return 0; +} \ No newline at end of file diff --git a/testing/is_epoll_really_thsafe/readme.md b/testing/is_epoll_really_thsafe/readme.md new file mode 100644 index 0000000..21eab27 --- /dev/null +++ b/testing/is_epoll_really_thsafe/readme.md @@ -0,0 +1,6 @@ +# Is epoll really thread safe? + +### Motivation +Scope of this test is simply to verify that TCP communication with epoll doesen't mix data. In particular, it's important to verify that if one big chunk is sent, the TCP protocol doesen't split it into different events for level triggered epoll (ofc it's gonna be splitted due to buf size). + +The important thing is that if I send 1MB of data, it doesen't wake up 2 threads listening to the same fd just because TCP split the package (afaik that shouln't happen, but it's always better to verify, since [`epoll()` is fundamentally broken](https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/)) \ No newline at end of file