Backup added

dev
MasterRoby3 2023-09-03 21:20:58 +02:00
parent 55d3a9baa3
commit 583d6fdae0
15 changed files with 605 additions and 1 deletions

View File

@ -0,0 +1,44 @@
#include "ControlledQueue.h"
#include <chrono>
#include <mutex>
ControlledQueue::ControlledQueue(uint32_t maxSize, int timeoutMicroseconds) : m_maxSize(maxSize), m_timeoutMicroseconds(timeoutMicroseconds) {;}
void ControlledQueue::put(Fragment fragment) {
std::unique_lock<std::mutex> lk(m_mtx);
/*
This while synthax is explicitly used to avoid unwanted spurious awakenings.
I'm keeping only the wait with not timeout to transmit backpressure to the upper reading thread, which blocks.
Since that thread has locked the mutex corresponding to the fd, basically the whole fd is blocked.
That way I can tranmit backpressure with TCP to the sending client.
*/
while (! (m_queue.size() < m_maxSize) ) {
m_cv.wait(lk);
}
m_queue.push(fragment);
lk.unlock();
m_cv.notify_all();
}
/*
Basically here a simple string exception is thrown if the wait terminates because of timeout and not because someone has inserted an element.
That way it can be catched from the main program that can set the error code accordingly.
*/
Fragment ControlledQueue::get() {
std::unique_lock<std::mutex> lk(m_mtx);
if ( !m_cv.wait_for(lk, std::chrono::microseconds(m_timeoutMicroseconds), !(m_queue.empty())) ) {
throw "Get Timeout";
}
Fragment fragment = m_queue.front();
m_queue.pop();
lk.unlock();
m_cv.notify_all();
return fragment;
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <cstdint>
#include <mutex>
#include <condition_variable>
#include <queue>
#include "fragment_dataformat.h"
class ControlledQueue {
public:
ControlledQueue(uint32_t maxSize, int timeoutMicroseconds);
void put(Fragment fragment);
Fragment get();
bool empty() { return m_queue.empty(); }
private:
std::mutex m_mtx;
std::condition_variable m_cv;
uint32_t m_maxSize;
std::queue<Fragment> m_queue;
int m_timeoutMicroseconds;
};

View File

@ -0,0 +1,8 @@
{
"folders": [
{
"path": ".."
}
],
"settings": {}
}

2
backup/README.md 100644
View File

@ -0,0 +1,2 @@
# Event Builder Project
This repo contains the code for a simple Event Builder inspired by ATLAS event format. It's a project for the exam "Tecniche Digitali di Acquisizione Dati" @UNIPV.

BIN
backup/a.out 100755

Binary file not shown.

BIN
backup/core 100644

Binary file not shown.

BIN
backup/evBuild.out 100755

Binary file not shown.

View File

@ -0,0 +1,244 @@
#include <arpa/inet.h>
#include <csignal>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <netinet/in.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <sys/time.h>
#include <errno.h>
#include <queue>
#include <vector>
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void bindSocketPort(int server_fd, int port) {
struct sockaddr_in localAddr;
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = INADDR_ANY;
localAddr.sin_port = htons(port);
if (bind(server_fd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
perror("bind failed");
exit(EXIT_FAILURE);
}
printf("FD %d bound to port %d\n", server_fd, port);
}
void startListening(int server_fd) {
if (listen(server_fd, 3) < 0) {
perror("listen");
exit(EXIT_FAILURE);
}
printf("FD %d listening to new connections\n", server_fd);
}
int acceptConnection(int server_fd) {
int client_fd;
struct sockaddr_in remoteAddr;
size_t addrlen = sizeof(remoteAddr);
if ((client_fd = accept(server_fd, (struct sockaddr *)&remoteAddr, (socklen_t *)&addrlen)) < 0) {
perror("accept");
exit(EXIT_FAILURE);
}
printf("Connection from host %s, port %d, FD %d\n", inet_ntoa(remoteAddr.sin_addr), ntohs(remoteAddr.sin_port), client_fd);
return client_fd;
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
/*
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int server_fd = makeSocket();
bindSocketPort(server_fd, port);
startListening(server_fd);
int client_fd = acceptConnection(server_fd);
while (true) {
uint32_t word;
ssize_t bytes = read(client_fd, &word, 4);
if (bytes != 4) {
perror("Receive failed");
exit(EXIT_FAILURE);
}
printf("[RICEVUTO]\t0x%x\n", word);
}
return 0;
}*/
#define TRUE 1
#define FALSE 0
int main(int argc, char const *argv[]) {
signal(SIGTERM, term_handler);
if (argc != 2) {
printf("Usage: %s portNumber \n", argv[0]);
exit(EXIT_FAILURE);
}
int port = atoi(argv[1]);
printf("Start socket port %d\n", port);
int opt = TRUE;
int master_socket , addrlen , new_socket , client_socket[30] ,
max_clients = 30 , activity, i , valread , sd;
int max_sd;
//set of socket descriptors
fd_set readfds;
//initialise all client_socket[] to 0 so not checked
for (i = 0; i < max_clients; i++)
{
client_socket[i] = 0;
}
master_socket = makeSocket();
//set master socket to allow multiple connections ,
//this is just a good habit, it will work without this
if( setsockopt(master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
sizeof(opt)) < 0 )
{
perror("setsockopt");
exit(EXIT_FAILURE);
}
bindSocketPort(master_socket, port);
startListening(master_socket);
while (true) {
//clear the socket set
FD_ZERO(&readfds);
//add master socket to set
FD_SET(master_socket, &readfds);
max_sd = master_socket;
//add child sockets to set
for ( i = 0 ; i < max_clients ; i++)
{
//socket descriptor
sd = client_socket[i];
//if valid socket descriptor then add to read list
if(sd > 0)
FD_SET( sd , &readfds);
//highest file descriptor number, need it for the select function
if(sd > max_sd)
max_sd = sd;
}
//wait for an activity on one of the sockets , timeout is NULL ,
//so wait indefinitely
activity = select( max_sd + 1 , &readfds , NULL , NULL , NULL);
if ((activity < 0) && (errno!=EINTR))
{
printf("select error");
}
//If something happened on the master socket ,
//then its an incoming connection
if (FD_ISSET(master_socket, &readfds))
{
new_socket = acceptConnection(master_socket);
//add new socket to array of sockets
for (i = 0; i < max_clients; i++)
{
//if position is empty
if( client_socket[i] == 0 )
{
client_socket[i] = new_socket;
printf("Adding to list of sockets as %d\n" , i);
break;
}
}
}
//else its some IO operation on some other socket
for (i = 0; i < max_clients; i++)
{
sd = client_socket[i];
if (FD_ISSET( sd , &readfds))
{
//Check if it was for closing , and also read the
//incoming message
uint32_t word;
if ((valread = recv( sd , &word, 4, 0)) == 0)
{
struct sockaddr_in address;
int addrlen;
//Somebody disconnected , get his details and print
getpeername(sd , (struct sockaddr*)&address , \
(socklen_t*)&addrlen);
printf("Host disconnected , ip %s , port %d \n" ,
inet_ntoa(address.sin_addr) , ntohs(address.sin_port));
printf("Disconnected fd %d", sd);
//Close the socket and mark as 0 in list for reuse
close( sd );
client_socket[i] = 0;
}
//Echo back the message that came in
else
{
printf("[RICEVUTO]\t0x%x FROM %d\n", word, sd);
}
}
}
}
/*
int client_fd = acceptConnection(server_fd);
while (true) {
uint32_t word;
ssize_t bytes = read(client_fd, &word, 4);
if (bytes != 4) {
perror("Receive failed");
exit(EXIT_FAILURE);
}
printf("[RICEVUTO]\t0x%x\n", word);
}*/
return 0;
}

View File

@ -0,0 +1,85 @@
#pragma once
#include <cstddef>
#include <cstdint>
#include <cstring>
#define FRAGMENT_HEADER_MARKER 0xee1234ee
typedef struct Header {
uint32_t startOfHeaderMarker = FRAGMENT_HEADER_MARKER;
uint32_t headerSize;
uint32_t fragmentSize;
uint32_t sourceIdentifier;
uint32_t runNumber;
uint32_t detectorEventNumber;
uint32_t numberOfStatusElements;
uint32_t *statusElementsArray;
/*friend std::ostream& operator <<(std::ostream& os, Header const& header)
{
return os << std::setw(8) << std::setfill('0') << header.startOfHeaderMarker << '\n'
<< std::setw(8) << std::setfill('0') << header.headerSize << '\n'
<< std::setw(8) << std::setfill('0') << header.fragmentSize << '\n'
<< std::setw(8) << std::setfill('0') << header.runNumber << '\n'
<< std::setw(8) << std::setfill('0') << header.detectorEventNumber << '\n'
<< std::setw(8) << std::setfill('0') << header.numberOfStatusElements << '\n';
}*/
} Header;
typedef struct Fragment {
Header header;
uint32_t *payloadElements;
} Fragment;
enum ERROR_CODES {
INCORRECT_ERROR = (1 << 0),
CORRUPTED_ERROR = (1 << 1),
MISSING_DATA_ERROR = (1 << 2),
TIMEOUT_ERROR = (1 << 3)
};
void encode_header(uint32_t *buffer, const Header &header) {
buffer[0] = header.startOfHeaderMarker;
buffer[1] = header.headerSize;
buffer[2] = header.fragmentSize;
buffer[3] = header.sourceIdentifier;
buffer[4] = header.runNumber;
buffer[5] = header.detectorEventNumber;
buffer[6] = header.numberOfStatusElements;
std::memcpy(&buffer[7], header.statusElementsArray, header.numberOfStatusElements * sizeof(uint32_t));
}
void encode_fragment(uint32_t *buffer, const Fragment &fragment){
encode_header(buffer, fragment.header);
std::memcpy(&buffer[fragment.header.headerSize], fragment.payloadElements, (fragment.header.fragmentSize - fragment.header.headerSize) * sizeof(uint32_t));
}
Fragment decode_fragment(uint32_t *buffer) {
Fragment fragment;
fragment.header.startOfHeaderMarker = buffer[0];
fragment.header.headerSize = buffer[1];
fragment.header.fragmentSize = buffer[2];
fragment.header.sourceIdentifier = buffer[3];
fragment.header.runNumber = buffer[4];
fragment.header.detectorEventNumber = buffer[5];
fragment.header.numberOfStatusElements = buffer[6];
uint32_t nStatusElements = fragment.header.numberOfStatusElements;
fragment.header.statusElementsArray = new uint32_t[nStatusElements];
for (int i = 1; i <= nStatusElements; i++) {
fragment.header.statusElementsArray[6+i] = buffer[6+i];
}
uint32_t payload_size = fragment.header.fragmentSize - fragment.header.headerSize;
fragment.payloadElements = new uint32_t[payload_size];
for (int i = 1; i <= payload_size; i++) {
fragment.payloadElements[6+nStatusElements+i] = buffer[6+nStatusElements+i];
}
return fragment;
}

View File

@ -0,0 +1,16 @@
#pragma once
#include "fragment_dataformat.h"
#include <cstdint>
#define FULL_EVENT_HEADER_MARKER 0xaa1234aa
typedef struct FullEvent {
uint32_t startOfHeaderMarker = FULL_EVENT_HEADER_MARKER;
uint32_t headerSize;
uint32_t eventSize;
uint32_t runNumber;
Fragment *fragmentsArray;
}FullEvent;

BIN
backup/prov.out 100755

Binary file not shown.

5
backup/prova.cpp 100644
View File

@ -0,0 +1,5 @@
#include <iostream>
int main() {
return 0;
}

161
backup/provider.cxx 100644
View File

@ -0,0 +1,161 @@
#include <algorithm>
#include <arpa/inet.h>
#include <cmath>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <iostream>
#include <iomanip>
#include <netinet/in.h>
#include <random>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
#include <csignal>
#include "fragment_dataformat.h"
#define MIN_FRAGMENTS 0
#define MAX_FRAGMENTS 100
/*
int rndError(float p){
float rndFloat = static_cast<float>(rand()) / static_cast<float>(RAND_MAX);
return rndFloat < p;
}*/
int makeSocket() {
int sockfd;
if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
perror("socket failed");
exit(EXIT_FAILURE);
}
return sockfd;
}
void connectTo(int sock, const char* host, int port) {
struct sockaddr_in serv_addr;
serv_addr.sin_family = AF_INET;
serv_addr.sin_addr.s_addr = inet_addr(host);
serv_addr.sin_port = htons(port);
if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) {
perror("Connection failed");
exit(EXIT_FAILURE);
}
printf("Connected to %s: %d\n", host, port);
}
void term_handler(int signal) {
printf("Terminated, received SIGNAL %d", signal);
exit(EXIT_SUCCESS);
}
int main(int argc, char* argv[]) {
signal(SIGTERM, term_handler);
signal(SIGINT, term_handler);
//will use it later to obtain random number from hardware
std::random_device rd;
std::mt19937 generator(42);
std::discrete_distribution<uint8_t> discrete_distr({0.9, 0.1});
std::normal_distribution<float> normal_distr(25., 10.);
std::uniform_int_distribution<uint32_t> unif_int_distib;
std::uniform_real_distribution<float> unif_float_distrib(0.5, 2);
//commenting all old_style generation
//srand (static_cast <unsigned> (42));
bool out_condition = true;
uint32_t event_number = 0;
uint32_t run_number = 0;
uint32_t source_id = 0;
if (argc != 5) {
printf("Usage %s host portNumber sourceId runNumber\n", argv[0]);
exit(EXIT_FAILURE);
}
const char* host = argv[1];
int port = atoi(argv[2]);
source_id = atoi(argv[3]);
run_number = atoi(argv[4]);
int socket = makeSocket();
connectTo(socket, host, port);
while (true) {
Header header;
header.sourceIdentifier = source_id;
header.runNumber = run_number;
header.detectorEventNumber = event_number;
event_number++;
header.numberOfStatusElements = 0;
//if swap to old style just change with function above and prob 0.1 for 1
uint8_t incorrectError = discrete_distr(generator);
uint8_t corruptedError = discrete_distr(generator);
uint8_t missingDataError = discrete_distr(generator);
uint8_t timeoutError = discrete_distr(generator);
uint32_t firstStatusElement = 0x0;
if (incorrectError) firstStatusElement |= INCORRECT_ERROR;
if (corruptedError) firstStatusElement |= CORRUPTED_ERROR;
if (missingDataError) firstStatusElement |= MISSING_DATA_ERROR;
if (timeoutError) firstStatusElement |= TIMEOUT_ERROR;
if (firstStatusElement != 0x0) {
header.numberOfStatusElements = 1;
header.statusElementsArray = new uint32_t[header.numberOfStatusElements];
header.statusElementsArray[0] = firstStatusElement;
}
uint32_t payload_size = std::max(std::min(static_cast<int>(std::round(normal_distr(generator))), MAX_FRAGMENTS), MIN_FRAGMENTS);
header.headerSize = 7 + header.numberOfStatusElements;
header.fragmentSize = header.headerSize + payload_size;
Fragment fragment;
fragment.header = header;
fragment.payloadElements = new uint32_t[payload_size];
for (uint32_t i = 0; i < payload_size; i++) {
fragment.payloadElements[i] = unif_int_distib(generator);
}
uint32_t buffer[header.fragmentSize];
encode_fragment(buffer, fragment);
ssize_t bytes = send(socket, reinterpret_cast<char*>(buffer), sizeof(buffer), 0);
if (bytes != header.fragmentSize * sizeof(uint32_t)) {
perror("Send failed: num bytes not matching");
exit(EXIT_FAILURE);
}
//std::cout << std::hex << std::setw(8) << std::setfill('0');
////std::cout << fragment.header;
//for (uint8_t i = 0; i < fragment.header.numberOfStatusElements; i++) {
// std::cout << "status element " << std::setw(8) << std::setfill('0') << fragment.header.statusElementsArray[i] << std::endl;
//}
//for (uint32_t i = 0; i < payload_size; i++) {
// std::cout << "payload element " << std::setw(8) << std::setfill('0') << fragment.payloadElements[i] << std::endl;
//}
//std::cout << std::endl;
if (header.numberOfStatusElements > 0){
delete [] fragment.header.statusElementsArray;
}
delete [] fragment.payloadElements;
sleep(unif_float_distrib(generator));
}
return 0;
}

View File

@ -0,0 +1,12 @@
#!/bin/bash
echo "Usage: $0 host port runNumber numberOfProviders"
if [ $# -eq 4 ]
then
echo "Selected host: $1:$2 runNumber: $3 numberOfProviders: $4"
for i in $(seq 1 $4);
do
echo "Spawning provider number $i"
./prov.out $1 $2 $3 &
done
fi

View File

@ -11,4 +11,3 @@ do
kill -15 `pidof ./prov.out`
sleep 1
done