Added epoll test on thread safety

dev
MasterRoby3 2023-08-27 16:18:17 +02:00
parent 50234b1d13
commit a287ea3d92
3 changed files with 439 additions and 0 deletions

View File

@ -0,0 +1,351 @@
#include <asm-generic/errno-base.h>
#include <asm-generic/errno.h>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <cstdio> // for fprintf()
#include <functional>
#include <iostream>
#include <ratio>
#include <unistd.h> // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
#include <cstring> // for strncmp
//my addition to the online guide
#include <csignal>
#include <cstdlib>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <utility>
#include <vector>
#include <chrono>
#include <fstream>
#include <thread>
#include <mutex>
#include <atomic>
#include <array>
#define MAX_EVENTS 20000
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void bindSocketPort(int server_fd, int port) {
struct sockaddr_in localAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
printf("FD %d bound to port %d\n", server_fd, port);
}
void startListening(int server_fd) {
if (listen(server_fd, 20000) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
printf("FD %d listening to new connections\n", server_fd);
}
int acceptConnection(int server_fd) {
int client_fd;
struct sockaddr_in remoteAddr;
size_t addrlen = sizeof(remoteAddr);
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
} else {
int flags = fcntl(client_fd, F_GETFL);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
return client_fd;
}
void acceptConnectionEpollStyle(int server_fd, int &efd) {
struct sockaddr_in new_remoteAddr;
int addrlen = sizeof(struct sockaddr_in);
while (true) {
int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen);
if (conn_sock == -1) {
// All incoming connections have been processed
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
} else {
perror("accept");
break;
}
}
// make new connection non-blocking
int flags = fcntl(conn_sock, F_GETFL, 0);
fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK);
// monitor new connection for read events, always in edge triggered
struct epoll_event event;
event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
event.data.fd = conn_sock;
// Allow epoll to monitor the new connection
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) {
perror("epoll_ctl: conn_sock");
break;
}
printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock);
}
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
std::atomic<uint64_t> grandtotal_kb;
void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_flag, const int thread_index) {
epoll_event events[MAX_EVENTS];
uint64_t bytes_read = 0;
uint64_t kBytes_read = 0;
while (true) {
if (th_flag == 1) {
grandtotal_kb += kBytes_read;
break;
}
// Time measurements
///auto start = std::chrono::high_resolution_clock::now();
// Returns only the sockets for which there are events
//printf("Before wait\n");
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
//printf("After wait\n");
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
// Iterate on the sockets having events
for (int i = 0; i < nfds; i++) {
//printf("Tot fds = %d reading from %d\n", nfds, i);
int fd = events[i].data.fd;
if (fd == master_socket) {
// If the activity is on the master socket, than it's a new connection request
acceptConnectionEpollStyle(master_socket, epoll_fd);
} else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) {
// Than the client connection is closed, so I close it
printf("Closing %d", fd);
close(fd);
} else {
// Than we received data from one of the monitored sockets
char buffer[buf_size];
int valread = 0;
//while (valread != EAGAIN) {
valread = recv(fd, &buffer, buf_size, 0);
if (valread > 0) {
//printf("[RICEVUTO]\t FROM %d\n", fd);
bytes_read += valread;
int kilos = 0;
if ((kilos = bytes_read / 1024) > 0) {
kBytes_read += kilos;
bytes_read -= (kilos * 1024);
//printf("reade bites %lu", bytes_read);
}
}
//}
}
}
///auto end = std::chrono::high_resolution_clock::now();
///double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
///
/*time_taken *= 1e-6;
total_time_taken += time_taken;
if (total_time_taken > 3e4) {
times.push_back(total_time_taken);
tot_received_data.push_back(kBytes_read);
break;
}*/
///
}
}
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int master_socket;
const int opt = 1;
master_socket = makeSocket();
//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
bindSocketPort(master_socket, port);
startListening(master_socket);
int flags = fcntl(master_socket, F_GETFL, 0);
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
epoll_event ev, events[MAX_EVENTS];
std::array<std::mutex, MAX_EVENTS> mutex_array;
std::array<uint64_t, MAX_EVENTS> kBytes_read_on_descr;
//The atomic here is used as a flag to tell the thread to stop
std::vector<std::thread> vThreads;
//create the epoll instance
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
printf("Failed to create epoll file descriptor\n");
exit(EXIT_FAILURE);
}
ev.data.fd = master_socket;
// Reading events with edge triggered mode
ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
// Allowing epoll to monitor the master_socket
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
std::vector<int> sizes;
std::vector<uint64_t> tot_received_data;
std::vector<double> times;
grandtotal_kb = 0;
int increment = 499;
for (int buf_size = 1; buf_size < 1e6 + 1; ) {
switch (buf_size) {
case 500:
increment = 500;
break;
case (int) 1e3:
increment = 1e3;
break;
case (int) 5e3:
increment = 5e3;
break;
case (int) 1e4:
increment = 1e4;
break;
case (int) 5e4:
increment = 5e4;
break;
case (int) 1e5:
increment = 1e5;
break;
case (int) 5e5:
increment = 5e5;
break;
}
printf("Next increment %d with current i: %d\n", increment, buf_size);
std::array<std::thread, 4> vT;
std::array<int, 4> thread_flags;
for (int t_i = 0; t_i < 4; t_i++) {
thread_flags[t_i] = 0;
vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), buf_size, std::cref(thread_flags.at(t_i)), t_i);
}
std::string command = "";
std::string ref("go");
if ( increment == 499){
while (command != "go") {
std::cout << "Insert command: ";
std::cin >> command;
std::cout << command << std::endl;
}
}
std::cout << "starting measurement with current buf size: " << buf_size << std::endl;
auto start = std::chrono::high_resolution_clock::now();
grandtotal_kb = 0;
sleep(30);
for (int t_i = 0; t_i < 4; t_i++) {
thread_flags[t_i] = 1;
vT[t_i].join();
}
uint64_t local_kB = grandtotal_kb;
auto end = std::chrono::high_resolution_clock::now();
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
time_taken *= 1e-6;
times.push_back(time_taken);
sizes.push_back(buf_size);
tot_received_data.push_back(local_kB);
buf_size += increment;
}
std::ofstream fout;
fout.open("epoll_data_stats_1000_multith_TIMEOUT.csv");
//the time is in milliseconds and the data in kbytes
fout << "buffer_size;time;total_received_data;\n";
auto iter_sizes = sizes.begin();
auto iter_times = times.begin();
auto iter_data = tot_received_data.begin();
for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) {
fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n";
}
fout.close();
if (close(epoll_fd)) {
printf("Failed to close epoll file descriptor");
exit(EXIT_FAILURE);
}
return 0;
}

View File

@ -0,0 +1,82 @@
#include <algorithm>
#include <arpa/inet.h>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <netinet/in.h>
#include <sys/socket.h>
#include <csignal>
#include <iostream>
#include <iomanip>
#include <tuple>
#include <unistd.h>
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void connectTo(int sock, const char* host, int port) {
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(host);
serv_addr.sin_port = htons(port);
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Connection failed");
exit(EXIT_FAILURE);
}
printf("Connected to %s: %d\n", host, port);
}
int main(int argc, char* argv[]) {
if (argc != 2) {
printf("Usage: ./prov.out timeout (ms)");
exit(EXIT_FAILURE);
}
int timeout = atoi(argv[1]);
//printf("Selected timeout: %f", static_cast<double>(timeout) / 1000);
int socket = makeSocket();
connectTo(socket, "127.0.0.1", 7777);
//allocating 100 megabytes of memory
uint64_t* chunky_boy = new uint64_t[67108];
size_t chunky_boy_size = 67108 * sizeof(uint64_t);
printf("chonky size %d", static_cast<int>(chunky_boy_size));
//setting memory to verify non-emptyness
memset(chunky_boy, 45678, chunky_boy_size);
int buffer_size = 1024 * 32;
for ( ;;) {
for ( int j = 0; j < chunky_boy_size; ) {
ssize_t bytes = send(socket, reinterpret_cast<char*>(chunky_boy) + j, std::min(static_cast<int>(chunky_boy_size) - j, buffer_size), 0);
if (timeout != 0) {
sleep(timeout);
}
j += buffer_size;
}
//usleep(1000);
}
return 0;
}

View File

@ -0,0 +1,6 @@
# Is epoll really thread safe?
### Motivation
Scope of this test is simply to verify that TCP communication with epoll doesen't mix data. In particular, it's important to verify that if one big chunk is sent, the TCP protocol doesen't split it into different events for level triggered epoll (ofc it's gonna be splitted due to buf size).
The important thing is that if I send 1MB of data, it doesen't wake up 2 threads listening to the same fd just because TCP split the package (afaik that shouln't happen, but it's always better to verify, since [`epoll()` is fundamentally broken](https://idea.popcount.org/2017-02-20-epoll-is-fundamentally-broken-12/))