Fully implemented multi_th test and got test results
							parent
							
								
									a287ea3d92
								
							
						
					
					
						commit
						ac02149c6d
					
				
										
											Binary file not shown.
										
									
								
							| 
						 | 
				
			
			@ -1,11 +1,11 @@
 | 
			
		|||
#include <asm-generic/errno-base.h>
 | 
			
		||||
#include <asm-generic/errno.h>
 | 
			
		||||
#include <cerrno>
 | 
			
		||||
#include <cstddef>
 | 
			
		||||
#include <cinttypes>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <cstdio>     // for fprintf()
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <iostream>
 | 
			
		||||
#include <queue>
 | 
			
		||||
#include <ratio>
 | 
			
		||||
#include <unistd.h>    // for close(), read()
 | 
			
		||||
#include <sys/epoll.h> // for epoll_create1(), epoll_ctl(), struct epoll_event
 | 
			
		||||
| 
						 | 
				
			
			@ -121,8 +121,11 @@ void term_handler(int signal) {
 | 
			
		|||
 | 
			
		||||
std::atomic<uint64_t> grandtotal_kb;
 | 
			
		||||
 | 
			
		||||
std::queue<std::pair<int, uint64_t>> data_queue;
 | 
			
		||||
std::mutex queue_mutex;
 | 
			
		||||
 | 
			
		||||
void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_flag, const int thread_index) {
 | 
			
		||||
 | 
			
		||||
void thradizable(int &epoll_fd, int &master_socket, const int &th_flag, const int thread_index) {
 | 
			
		||||
    epoll_event events[MAX_EVENTS];
 | 
			
		||||
    uint64_t bytes_read = 0;
 | 
			
		||||
    uint64_t kBytes_read = 0;
 | 
			
		||||
| 
						 | 
				
			
			@ -144,6 +147,8 @@ void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_
 | 
			
		|||
            exit(EXIT_FAILURE);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        
 | 
			
		||||
 | 
			
		||||
        // Iterate on the sockets having events
 | 
			
		||||
        for (int i = 0; i < nfds; i++) {
 | 
			
		||||
            //printf("Tot fds = %d reading from %d\n", nfds, i);
 | 
			
		||||
| 
						 | 
				
			
			@ -157,21 +162,34 @@ void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_
 | 
			
		|||
                printf("Closing %d", fd);
 | 
			
		||||
                close(fd);
 | 
			
		||||
            } else {
 | 
			
		||||
                uint64_t part = grandtotal_kb;
 | 
			
		||||
                //printf("Ev trig th %d with tot b %lu\n", thread_index, part);
 | 
			
		||||
                // Than we received data from one of the monitored sockets
 | 
			
		||||
                char buffer[buf_size];
 | 
			
		||||
                uint64_t buffer[8];
 | 
			
		||||
                int valread = 0;
 | 
			
		||||
                //while (valread != EAGAIN) {
 | 
			
		||||
                    valread = recv(fd, &buffer, buf_size, 0);
 | 
			
		||||
                    std::unique_lock<std::mutex> lk(queue_mutex);
 | 
			
		||||
                    valread = recv(fd, reinterpret_cast<char*>(buffer), sizeof(buffer), 0);
 | 
			
		||||
                    if (valread > 0) {
 | 
			
		||||
                        //printf("[RICEVUTO]\t FROM %d\n", fd);
 | 
			
		||||
 | 
			
		||||
                        int opt_incr = (valread % 8) ? 1 : 0;
 | 
			
		||||
                        for (int q_i = 0; q_i < (valread/8) + opt_incr; q_i++) {
 | 
			
		||||
                            data_queue.push(std::pair<int, uint64_t>(thread_index, buffer[q_i]));
 | 
			
		||||
                        }
 | 
			
		||||
                        grandtotal_kb += valread;
 | 
			
		||||
                        
 | 
			
		||||
 | 
			
		||||
                        /*
 | 
			
		||||
                        bytes_read += valread;
 | 
			
		||||
                        int kilos = 0;
 | 
			
		||||
                        if ((kilos = bytes_read / 1024) > 0) {
 | 
			
		||||
                            kBytes_read += kilos;
 | 
			
		||||
                            bytes_read -= (kilos * 1024);
 | 
			
		||||
                            //printf("reade bites %lu", bytes_read);
 | 
			
		||||
                        }*/
 | 
			
		||||
                    }
 | 
			
		||||
                    }
 | 
			
		||||
                    lk.unlock();
 | 
			
		||||
                //}
 | 
			
		||||
 | 
			
		||||
            }
 | 
			
		||||
| 
						 | 
				
			
			@ -195,6 +213,35 @@ void thradizable(int &epoll_fd, int &master_socket, int buf_size, const int &th_
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
std::vector<uint64_t> popped;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
void printer_thread() {
 | 
			
		||||
    while (1) {
 | 
			
		||||
        std::unique_lock<std::mutex> lk(queue_mutex);
 | 
			
		||||
        if (!data_queue.empty()) {
 | 
			
		||||
            std::pair<int, uint64_t> element = data_queue.front();
 | 
			
		||||
            data_queue.pop();
 | 
			
		||||
            popped.push_back(element.second);
 | 
			
		||||
            //printf("Element: %lu from thread %d\n", element.second, element.first);
 | 
			
		||||
 | 
			
		||||
            if (element.second == 999) {
 | 
			
		||||
                int fail_flag = 0;
 | 
			
		||||
                for (int i = 1; i < 999; i++) {
 | 
			
		||||
                    if (popped[i] == i) continue;
 | 
			
		||||
                    else fail_flag = 1;
 | 
			
		||||
                }
 | 
			
		||||
                
 | 
			
		||||
                if (fail_flag == 1) printf("FAILURE\n");
 | 
			
		||||
                else printf("success\n");
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        lk.unlock();
 | 
			
		||||
        usleep(10);
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
int main(int argc, char const *argv[]) {
 | 
			
		||||
 | 
			
		||||
    signal(SIGTERM, term_handler);
 | 
			
		||||
| 
						 | 
				
			
			@ -229,7 +276,6 @@ int main(int argc, char const *argv[]) {
 | 
			
		|||
    fcntl(master_socket, F_SETFL, flags | O_NONBLOCK);
 | 
			
		||||
 | 
			
		||||
    epoll_event ev, events[MAX_EVENTS];
 | 
			
		||||
    std::array<std::mutex, MAX_EVENTS> mutex_array;
 | 
			
		||||
    std::array<uint64_t, MAX_EVENTS> kBytes_read_on_descr;
 | 
			
		||||
 | 
			
		||||
    //The atomic here is used as a flag to tell the thread to stop
 | 
			
		||||
| 
						 | 
				
			
			@ -252,95 +298,22 @@ int main(int argc, char const *argv[]) {
 | 
			
		|||
        exit(EXIT_FAILURE);
 | 
			
		||||
    }
 | 
			
		||||
    
 | 
			
		||||
    std::vector<int> sizes;
 | 
			
		||||
    std::vector<uint64_t> tot_received_data;
 | 
			
		||||
    std::vector<double> times; 
 | 
			
		||||
    std::array<std::thread, 2> vT;
 | 
			
		||||
    std::array<int, 2> thread_flags;
 | 
			
		||||
    
 | 
			
		||||
    grandtotal_kb = 0;
 | 
			
		||||
    int increment = 499;
 | 
			
		||||
 | 
			
		||||
    for (int buf_size = 1; buf_size < 1e6 + 1; ) {
 | 
			
		||||
        switch (buf_size) {
 | 
			
		||||
            case 500:
 | 
			
		||||
                increment = 500;
 | 
			
		||||
                break;
 | 
			
		||||
            case (int) 1e3:
 | 
			
		||||
                increment = 1e3;
 | 
			
		||||
                break;
 | 
			
		||||
            case (int) 5e3:
 | 
			
		||||
                increment = 5e3;
 | 
			
		||||
                break;
 | 
			
		||||
            case (int) 1e4:
 | 
			
		||||
                increment = 1e4;
 | 
			
		||||
                break;
 | 
			
		||||
            case (int) 5e4:
 | 
			
		||||
                increment = 5e4;
 | 
			
		||||
                break;
 | 
			
		||||
            case (int) 1e5:
 | 
			
		||||
                increment = 1e5;
 | 
			
		||||
                break;
 | 
			
		||||
            case (int) 5e5:
 | 
			
		||||
                increment = 5e5;
 | 
			
		||||
                break;
 | 
			
		||||
        }
 | 
			
		||||
        printf("Next increment %d with current i: %d\n", increment, buf_size);
 | 
			
		||||
    
 | 
			
		||||
        std::array<std::thread, 4> vT;
 | 
			
		||||
        std::array<int, 4> thread_flags;
 | 
			
		||||
 | 
			
		||||
        for (int t_i = 0; t_i < 4; t_i++) {
 | 
			
		||||
    for (int t_i = 0; t_i < 2; t_i++) {
 | 
			
		||||
        thread_flags[t_i] = 0;
 | 
			
		||||
            vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), buf_size, std::cref(thread_flags.at(t_i)), t_i);
 | 
			
		||||
        vT[t_i] = std::thread(thradizable, std::ref(epoll_fd), std::ref(master_socket), std::cref(thread_flags.at(t_i)), t_i);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        std::string command = "";
 | 
			
		||||
        std::string ref("go");
 | 
			
		||||
        if ( increment == 499){
 | 
			
		||||
            while (command != "go") {
 | 
			
		||||
                std::cout << "Insert command: ";
 | 
			
		||||
                std::cin >> command;
 | 
			
		||||
                std::cout << command << std::endl;
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
    std::thread printer(printer_thread);
 | 
			
		||||
 | 
			
		||||
        std::cout << "starting measurement with current buf size: " << buf_size << std::endl;
 | 
			
		||||
 | 
			
		||||
        auto start = std::chrono::high_resolution_clock::now();
 | 
			
		||||
        grandtotal_kb = 0;
 | 
			
		||||
 | 
			
		||||
        sleep(30);
 | 
			
		||||
        for (int t_i = 0; t_i < 4; t_i++) {
 | 
			
		||||
            thread_flags[t_i] = 1;
 | 
			
		||||
            vT[t_i].join();
 | 
			
		||||
        }
 | 
			
		||||
        uint64_t local_kB = grandtotal_kb;
 | 
			
		||||
        auto end = std::chrono::high_resolution_clock::now();
 | 
			
		||||
        double time_taken = std::chrono::duration_cast<std::chrono::nanoseconds>(end - start).count();
 | 
			
		||||
        //time taken in milliseconds
 | 
			
		||||
        time_taken *= 1e-6;
 | 
			
		||||
 | 
			
		||||
        times.push_back(time_taken);
 | 
			
		||||
        sizes.push_back(buf_size);
 | 
			
		||||
        tot_received_data.push_back(local_kB);
 | 
			
		||||
 | 
			
		||||
        buf_size += increment;
 | 
			
		||||
    }
 | 
			
		||||
    printer.join();
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    std::ofstream fout;
 | 
			
		||||
    fout.open("epoll_data_stats_1000_multith_TIMEOUT.csv");
 | 
			
		||||
    //the time is in milliseconds and the data in kbytes
 | 
			
		||||
    fout << "buffer_size;time;total_received_data;\n";
 | 
			
		||||
    auto iter_sizes = sizes.begin();
 | 
			
		||||
    auto iter_times = times.begin();
 | 
			
		||||
    auto iter_data = tot_received_data.begin();
 | 
			
		||||
 | 
			
		||||
    for ( ; (iter_sizes != sizes.end()) && (iter_times != times.end()) && (iter_data != tot_received_data.end()) ; (++iter_sizes, ++iter_times, ++iter_data) ) {
 | 
			
		||||
        fout << *iter_sizes << ";" << *iter_times <<  ";" << *iter_data << ";\n";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    fout.close();
 | 
			
		||||
 | 
			
		||||
    if (close(epoll_fd)) {
 | 
			
		||||
        printf("Failed to close epoll file descriptor");
 | 
			
		||||
| 
						 | 
				
			
			@ -0,0 +1,14 @@
 | 
			
		|||
#!/bin/bash
 | 
			
		||||
 | 
			
		||||
for i in $(seq 1 1000);
 | 
			
		||||
do
 | 
			
		||||
    echo "exec $i" >> res.txt
 | 
			
		||||
    ./ev_builder.out 7777 >> res.txt &
 | 
			
		||||
    sleep 1
 | 
			
		||||
    ./prov.out &
 | 
			
		||||
    sleep 3
 | 
			
		||||
    kill -15 `pidof ./ev_builder.out`
 | 
			
		||||
    kill -15 `pidof ./prov.out`
 | 
			
		||||
    sleep 1
 | 
			
		||||
done
 | 
			
		||||
 | 
			
		||||
										
											Binary file not shown.
										
									
								
							| 
						 | 
				
			
			@ -15,6 +15,8 @@
 | 
			
		|||
#include <unistd.h>
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
#define BIGDIM 10000
 | 
			
		||||
 | 
			
		||||
int makeSocket() {
 | 
			
		||||
    int sockfd;
 | 
			
		||||
    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
 | 
			
		||||
| 
						 | 
				
			
			@ -39,44 +41,22 @@ void connectTo(int sock, const char* host, int port) {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
int main(int argc, char* argv[]) {
 | 
			
		||||
 | 
			
		||||
     if (argc != 2) {
 | 
			
		||||
        printf("Usage: ./prov.out timeout (ms)");
 | 
			
		||||
        exit(EXIT_FAILURE);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    int timeout = atoi(argv[1]);
 | 
			
		||||
    //printf("Selected timeout: %f", static_cast<double>(timeout) / 1000);
 | 
			
		||||
 | 
			
		||||
    int socket = makeSocket();
 | 
			
		||||
    connectTo(socket, "127.0.0.1", 7777);
 | 
			
		||||
 | 
			
		||||
    //allocating 100 megabytes of memory
 | 
			
		||||
    uint64_t* chunky_boy = new uint64_t[67108];
 | 
			
		||||
    size_t chunky_boy_size = 67108 * sizeof(uint64_t);
 | 
			
		||||
    printf("chonky size %d", static_cast<int>(chunky_boy_size));
 | 
			
		||||
    uint64_t alotofvalues[BIGDIM];
 | 
			
		||||
 | 
			
		||||
    //setting memory to verify non-emptyness
 | 
			
		||||
    memset(chunky_boy, 45678, chunky_boy_size);
 | 
			
		||||
 | 
			
		||||
    int buffer_size = 1024 * 32;
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    for ( ;;) {
 | 
			
		||||
        
 | 
			
		||||
        for ( int j = 0; j < chunky_boy_size; ) {
 | 
			
		||||
            
 | 
			
		||||
            ssize_t bytes = send(socket, reinterpret_cast<char*>(chunky_boy) + j, std::min(static_cast<int>(chunky_boy_size) - j, buffer_size), 0);
 | 
			
		||||
            if (timeout != 0) {
 | 
			
		||||
                sleep(timeout);
 | 
			
		||||
            }
 | 
			
		||||
            j += buffer_size;
 | 
			
		||||
    for (int i = 1; i < BIGDIM; i++) {
 | 
			
		||||
        alotofvalues[i] = i;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
        //usleep(1000);
 | 
			
		||||
 | 
			
		||||
    int send_counter = 0;
 | 
			
		||||
    for (;;) {
 | 
			
		||||
        alotofvalues[0] = send_counter;
 | 
			
		||||
        ssize_t bytes = send(socket, reinterpret_cast<char*>(alotofvalues), sizeof(alotofvalues), 0);
 | 
			
		||||
        sleep(10000);
 | 
			
		||||
        send_counter++;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    return 0;
 | 
			
		||||
}
 | 
			
		||||
										
											
												File diff suppressed because it is too large
												Load Diff
											
										
									
								
							
		Loading…
	
		Reference in New Issue