Finished connection method and fully implemented threads

dev
MasterRoby3 2023-08-27 16:11:01 +02:00
parent 14795161ec
commit 50234b1d13
17 changed files with 525 additions and 0 deletions

View File

@ -15,3 +15,5 @@ The timeout is pretty hard, we're talking 1 second, to hilight the difference in
With epoll we see a big rise in throughput with the 2 clients, since we kill cpu time for iteration and optimize the 30 seconds time analysis,
in the 50 clients it's not so evident. With 50 we start to see already big improvements with epoll vs select.
Now I try to multithread epoll, to see if the data acquired increases. I'll measure the full bandwith received, I expect a semi-linear increase in performance.
For the multithreading thread check the threads folder

View File

@ -0,0 +1,19 @@
buffer_size;time;total_received_data;
1;30001.8;121453;
500;30001.8;38315920;
1000;30013.8;60329152;
2000;30005.2;74660074;
3000;30005.9;85193274;
4000;30006.5;96597573;
5000;30008.5;106440822;
10000;30011.8;137147617;
20000;30007.9;176761989;
30000;30006.4;156482849;
40000;30014;177159351;
50000;30049.3;171875036;
100000;30053.9;224264206;
200000;30048.3;225628771;
300000;30121;180069213;
400000;30016.7;156804334;
500000;30000.6;230636978;
1000000;30135;159812471;
1 buffer_size time total_received_data
2 1 30001.8 121453
3 500 30001.8 38315920
4 1000 30013.8 60329152
5 2000 30005.2 74660074
6 3000 30005.9 85193274
7 4000 30006.5 96597573
8 5000 30008.5 106440822
9 10000 30011.8 137147617
10 20000 30007.9 176761989
11 30000 30006.4 156482849
12 40000 30014 177159351
13 50000 30049.3 171875036
14 100000 30053.9 224264206
15 200000 30048.3 225628771
16 300000 30121 180069213
17 400000 30016.7 156804334
18 500000 30000.6 230636978
19 1000000 30135 159812471

View File

@ -0,0 +1,19 @@
buffer_size;time;total_received_data;
1;30002.1;140629;
500;30003.8;31440572;
1000;30004.1;48545615;
2000;30010.9;65841163;
3000;30011.9;79191486;
4000;30008.6;83061533;
5000;30011.5;82064998;
10000;30005.2;96952555;
20000;30007;132912119;
30000;30005.2;138298731;
40000;30017.7;138156188;
50000;30017.2;139215949;
100000;30051.2;139854062;
200000;30008.9;146022695;
300000;30061.5;142890809;
400000;30083.1;138006324;
500000;30074.4;133424935;
1000000;30000.5;114451148;
1 buffer_size time total_received_data
2 1 30002.1 140629
3 500 30003.8 31440572
4 1000 30004.1 48545615
5 2000 30010.9 65841163
6 3000 30011.9 79191486
7 4000 30008.6 83061533
8 5000 30011.5 82064998
9 10000 30005.2 96952555
10 20000 30007 132912119
11 30000 30005.2 138298731
12 40000 30017.7 138156188
13 50000 30017.2 139215949
14 100000 30051.2 139854062
15 200000 30008.9 146022695
16 300000 30061.5 142890809
17 400000 30083.1 138006324
18 500000 30074.4 133424935
19 1000000 30000.5 114451148

View File

@ -0,0 +1,19 @@
buffer_size;time;total_received_data;
1;30002.1;163986;
500;30019;34088098;
1000;30005.1;50801994;
2000;30006.2;69301651;
3000;30005.4;80399265;
4000;30012.7;90123946;
5000;30010.8;98647263;
10000;30011.7;116325174;
20000;30049.7;134726431;
30000;30025;144562322;
40000;30028.7;147823821;
50000;30012.3;146218628;
100000;30069.2;151376815;
200000;30011.5;138831476;
300000;30022.2;145632228;
400000;30096.8;146447339;
500000;30143.2;134063359;
1000000;30000.4;123787063;
1 buffer_size time total_received_data
2 1 30002.1 163986
3 500 30019 34088098
4 1000 30005.1 50801994
5 2000 30006.2 69301651
6 3000 30005.4 80399265
7 4000 30012.7 90123946
8 5000 30010.8 98647263
9 10000 30011.7 116325174
10 20000 30049.7 134726431
11 30000 30025 144562322
12 40000 30028.7 147823821
13 50000 30012.3 146218628
14 100000 30069.2 151376815
15 200000 30011.5 138831476
16 300000 30022.2 145632228
17 400000 30096.8 146447339
18 500000 30143.2 134063359
19 1000000 30000.4 123787063

Binary file not shown.

View File

@ -0,0 +1,351 @@
#include <asm-generic/errno-base.h>
#include <asm-generic/errno.h>
#include <cerrno>
#include <cstddef>
#include <cstdint>
#include <cstdio> // for fprintf()
#include <functional>
#include <iostream>
#include <ratio>
#include <unistd.h> // for close(), read()
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
#include <cstring> // for strncmp
//my addition to the online guide
#include <csignal>
#include <cstdlib>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <fcntl.h>
#include <utility>
#include <vector>
#include <chrono>
#include <fstream>
#include <thread>
#include <mutex>
#include <atomic>
#include <array>
#define MAX_EVENTS 20000
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void bindSocketPort(int server_fd, int port) {
struct sockaddr_in localAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
printf("FD %d bound to port %d\n", server_fd, port);
}
void startListening(int server_fd) {
if (listen(server_fd, 20000) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
printf("FD %d listening to new connections\n", server_fd);
}
int acceptConnection(int server_fd) {
int client_fd;
struct sockaddr_in remoteAddr;
size_t addrlen = sizeof(remoteAddr);
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
} else {
int flags = fcntl(client_fd, F_GETFL);
fcntl(client_fd, F_SETFL, flags | O_NONBLOCK);
}
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
return client_fd;
}
void acceptConnectionEpollStyle(int server_fd, int &efd) {
struct sockaddr_in new_remoteAddr;
int addrlen = sizeof(struct sockaddr_in);
while (true) {
int conn_sock = accept(server_fd, (struct sockaddr*)&new_remoteAddr, (socklen_t*)&addrlen);
if (conn_sock == -1) {
// All incoming connections have been processed
if ((errno == EAGAIN) || (errno == EWOULDBLOCK)) {
break;
} else {
perror("accept");
break;
}
}
// make new connection non-blocking
int flags = fcntl(conn_sock, F_GETFL, 0);
fcntl(conn_sock, F_SETFL, flags | O_NONBLOCK);
// monitor new connection for read events, always in edge triggered
struct epoll_event event;
event.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
event.data.fd = conn_sock;
// Allow epoll to monitor the new connection
if (epoll_ctl(efd, EPOLL_CTL_ADD, conn_sock, &event) == -1) {
perror("epoll_ctl: conn_sock");
break;
}
printf("Accepted epoll style connection from %s:%d from fd: %d\n", inet_ntoa(new_remoteAddr.sin_addr), ntohs(new_remoteAddr.sin_port), conn_sock);
}
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
std::atomic<uint64_t> grandtotal_kb;
void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_flag, const int thread_index) {
epoll_event events[MAX_EVENTS];
uint64_t bytes_read = 0;
uint64_t kBytes_read = 0;
while (true) {
if (th_flag == 1) {
grandtotal_kb += kBytes_read;
break;
}
// Time measurements
///auto start = std::chrono::high_resolution_clock::now();
// Returns only the sockets for which there are events
//printf("Before wait\n");
int nfds = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
//printf("After wait\n");
if (nfds == -1) {
perror("epoll_wait");
exit(EXIT_FAILURE);
}
// Iterate on the sockets having events
for (int i = 0; i < nfds; i++) {
//printf("Tot fds = %d reading from %d\n", nfds, i);
int fd = events[i].data.fd;
if (fd == master_socket) {
// If the activity is on the master socket, than it's a new connection request
acceptConnectionEpollStyle(master_socket, epoll_fd);
} else if ((events[i].events & EPOLLERR) || (events[i].events & EPOLLHUP) || (!(events[i].events & EPOLLIN))) {
// Than the client connection is closed, so I close it
printf("Closing %d", fd);
close(fd);
} else {
// Than we received data from one of the monitored sockets
char buffer[buf_size];
int valread = 0;
//while (valread != EAGAIN) {
valread = recv(fd, &buffer, buf_size, 0);
if (valread > 0) {
//printf("[RICEVUTO]\t FROM %d\n", fd);
bytes_read += valread;
int kilos = 0;
if ((kilos = bytes_read / 1024) > 0) {
kBytes_read += kilos;
bytes_read -= (kilos * 1024);
//printf("reade bites %lu", bytes_read);
}
}
//}
}
}
///auto end = std::chrono::high_resolution_clock::now();
///double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
///
/*time_taken *= 1e-6;
total_time_taken += time_taken;
if (total_time_taken > 3e4) {
times.push_back(total_time_taken);
tot_received_data.push_back(kBytes_read);
break;
}*/
///
}
}
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int master_socket;
const int opt = 1;
master_socket = makeSocket();
//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
bindSocketPort(master_socket, port);
startListening(master_socket);
int flags = fcntl(master_socket, F_GETFL, 0);
fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
epoll_event ev, events[MAX_EVENTS];
std::array<std::mutex, MAX_EVENTS> mutex_array;
std::array<uint64_t, MAX_EVENTS> kBytes_read_on_descr;
//The atomic here is used as a flag to tell the thread to stop
std::vector<std::thread> vThreads;
//create the epoll instance
int epoll_fd = epoll_create1(0);
if (epoll_fd == -1) {
printf("Failed to create epoll file descriptor\n");
exit(EXIT_FAILURE);
}
ev.data.fd = master_socket;
// Reading events with edge triggered mode
ev.events = EPOLLIN | EPOLLEXCLUSIVE;//| EPOLLET;
// Allowing epoll to monitor the master_socket
if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, master_socket, &ev) == -1){
perror("epoll_ctl");
exit(EXIT_FAILURE);
}
std::vector<int> sizes;
std::vector<uint64_t> tot_received_data;
std::vector<double> times;
grandtotal_kb = 0;
int increment = 499;
for (int buf_size = 1; buf_size < 1e6 + 1; ) {
switch (buf_size) {
case 500:
increment = 500;
break;
case (int) 1e3:
increment = 1e3;
break;
case (int) 5e3:
increment = 5e3;
break;
case (int) 1e4:
increment = 1e4;
break;
case (int) 5e4:
increment = 5e4;
break;
case (int) 1e5:
increment = 1e5;
break;
case (int) 5e5:
increment = 5e5;
break;
}
printf("Next increment %d with current i: %d\n", increment, buf_size);
std::array<std::thread, 4> vT;
std::array<int, 4> thread_flags;
for (int t_i = 0; t_i < 4; t_i++) {
thread_flags[t_i] = 0;
vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), buf_size, std::cref(thread_flags.at(t_i)), t_i);
}
std::string command = "";
std::string ref("go");
if ( increment == 499){
while (command != "go") {
std::cout << "Insert command: ";
std::cin >> command;
std::cout << command << std::endl;
}
}
std::cout << "starting measurement with current buf size: " << buf_size << std::endl;
auto start = std::chrono::high_resolution_clock::now();
grandtotal_kb = 0;
sleep(30);
for (int t_i = 0; t_i < 4; t_i++) {
thread_flags[t_i] = 1;
vT[t_i].join();
}
uint64_t local_kB = grandtotal_kb;
auto end = std::chrono::high_resolution_clock::now();
double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
//time taken in milliseconds
time_taken *= 1e-6;
times.push_back(time_taken);
sizes.push_back(buf_size);
tot_received_data.push_back(local_kB);
buf_size += increment;
}
std::ofstream fout;
fout.open("epoll_data_stats_1000_multith_TIMEOUT.csv");
//the time is in milliseconds and the data in kbytes
fout << "buffer_size;time;total_received_data;\n";
auto iter_sizes = sizes.begin();
auto iter_times = times.begin();
auto iter_data = tot_received_data.begin();
for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) {
fout << *iter_sizes << ";" << *iter_times << ";" << *iter_data << ";\n";
}
fout.close();
if (close(epoll_fd)) {
printf("Failed to close epoll file descriptor");
exit(EXIT_FAILURE);
}
return 0;
}

Binary file not shown.

View File

@ -0,0 +1,82 @@
#include <algorithm>
#include <arpa/inet.h>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>
#include <netinet/in.h>
#include <sys/socket.h>
#include <csignal>
#include <iostream>
#include <iomanip>
#include <tuple>
#include <unistd.h>
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void connectTo(int sock, const char* host, int port) {
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(host);
serv_addr.sin_port = htons(port);
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Connection failed");
exit(EXIT_FAILURE);
}
printf("Connected to %s: %d\n", host, port);
}
int main(int argc, char* argv[]) {
if (argc != 2) {
printf("Usage: ./prov.out timeout (ms)");
exit(EXIT_FAILURE);
}
int timeout = atoi(argv[1]);
//printf("Selected timeout: %f", static_cast<double>(timeout) / 1000);
int socket = makeSocket();
connectTo(socket, "127.0.0.1", 7777);
//allocating 100 megabytes of memory
uint64_t* chunky_boy = new uint64_t[67108];
size_t chunky_boy_size = 67108 * sizeof(uint64_t);
printf("chonky size %d", static_cast<int>(chunky_boy_size));
//setting memory to verify non-emptyness
memset(chunky_boy, 45678, chunky_boy_size);
int buffer_size = 1024 * 32;
for ( ;;) {
for ( int j = 0; j < chunky_boy_size; ) {
ssize_t bytes = send(socket, reinterpret_cast<char*>(chunky_boy) + j, std::min(static_cast<int>(chunky_boy_size) - j, buffer_size), 0);
if (timeout != 0) {
sleep(timeout);
}
j += buffer_size;
}
//usleep(1000);
}
return 0;
}

View File

@ -0,0 +1,18 @@
# Multithreaded epoll()
### Not a naïve implementation
To correctly set a multithreaded `epoll()` it's necessary to specifically set some flags. We'll consider level triggered mode since that's what we're using. Since we'll split the requests on different worker threads I expect the performance to scale semi-linearly with the number of workers.
The provider implementation doesen't change. I'll spawn providers with no timeout.
#### Results
With 4 threads we manage to see from ~1.5x to ~3x increase in performance without timeout. In prod we can maximize cpu cores using for read 8-n-1 threads, where 1 is for the main thread and n is the number of thrads used to build the full event (probably would be n=1).
With timeout, I dont see improvement with the TIMEOUT_HARD (only 2 clients sending continuously), actually some worse perfomance in some cases. Prob this is due the EPOLLEXCLUSIVE which waste time distributing on more threads.
With the TIMEOUD (50 clients sending continuously) I get some improvement, but not comparable to the all continuous ones. That's probably because of the fact that i'm getting some sort of tradeoff between exec time in incoming connection and time needed to distribute load on threads.
In an environment with a lot of clients and short timeouts, the approach with more than one thread is obviously preferrable for scalability reasons (50 cont + 950 1 sec timeout shows improvement already).
In such enviroment, a multito avoid reordering.hread instance should use mutexes on read data to avoid reordering of received data. It's gonna be explained in detail during the presentations.
(thats to study a little better)

View File

@ -0,0 +1,15 @@
#!/bin/bash
echo "Usage: $0 host port runNumber numberOfProviders"
if [ $# -eq 1 ]
then
for i in $(seq 1 $1);
do
echo "Spawning provider number $i"
if [ $i -le 50 ]
then
./prov.out 1&
else
./prov.out 0&
fi
done
fi