Skip to content

Commit

Permalink
Implementation of pthread with FIFO order
Browse files Browse the repository at this point in the history
  • Loading branch information
FedeCana00 committed Sep 9, 2023
1 parent 6069844 commit c41ee8f
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 13 deletions.
31 changes: 22 additions & 9 deletions main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,25 @@
#include "src/QueueLib.h"
#include <pthread.h>
#include <cstdint> // to include type uintptr_t
#include <cstdlib> // to include rand()
#include <unistd.h> // to include usleep()

//QueueLib<int, DYNAMIC_MODE> queue(true);
QueueLib<int, STATIC_MODE> queue(true);
QueueLib<int, DYNAMIC_MODE> queue(true);
//QueueLib<int, STATIC_MODE> queue(true);

int delayMicroseconds = rand() % 900000 + 100000; // random delay between 100ms and 1000ms

void* threadFunction1(void* arg) {

void* threadFunction1(void*) {
uintptr_t tid = (uintptr_t) pthread_self();

std::cout << "Thread " << (int)tid << " in execution" << std::endl;

queue.push(1, tid);
usleep(delayMicroseconds);
queue.push(1, 1, tid);
queue.push(3, 3, tid);
usleep(delayMicroseconds);
queue.push(3, 1, tid);

int value = 0;
queue.pull(value, tid);
Expand All @@ -24,13 +31,16 @@ void* threadFunction1(void* arg) {
return NULL;
}

void* threadFunction2(void* arg) {
void* threadFunction2(void*) {
uintptr_t tid = (uintptr_t) pthread_self();

std::cout << "Thread " << (int)tid << " in execution" << std::endl;

queue.push(2, tid);
usleep(delayMicroseconds);
queue.push(2, 2, tid);
queue.push(4, 4, tid);
usleep(delayMicroseconds);
queue.push(4, 2, tid);

int value = 0;
queue.pull(value, tid);
Expand All @@ -41,13 +51,16 @@ void* threadFunction2(void* arg) {
return NULL;
}

void* threadFunction3(void* arg) {
void* threadFunction3(void*) {
uintptr_t tid = (uintptr_t) pthread_self();

std::cout << "Thread " << (int)tid << " in execution" << std::endl;

queue.push(5, 5, tid);
queue.push(6, 6, tid);
queue.push(3, tid);
usleep(delayMicroseconds);
queue.push(5, 3, tid);
usleep(delayMicroseconds);
queue.push(6, 3, tid);

int value = 0;
queue.pull(value, tid);
Expand Down
69 changes: 66 additions & 3 deletions src/QueueLib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,68 @@ template <typename T, bool isStatic>
QueueLib<T, isStatic>::QueueLib(bool isVerbose) {
queue.setVerbose(isVerbose);

// semaphore turnstile initialization
sem_unlink("/turnstile"); // if already exists
turnstile = sem_open("/turnstile", O_CREAT | O_EXCL, 0666, 1);
if(turnstile == SEM_FAILED){
std::cerr << "Error init of semaphore turnstile" << std::endl;
exit(EXIT_FAILURE);
}

// semaphore mutex initialization
sem_unlink("/mutex"); // if already exists
mutex = sem_open("/mutex", O_CREAT | O_EXCL, 0666, 1);
if(mutex == SEM_FAILED)
std::cout << "Error init of semaphore mutex" << std::endl;
if(mutex == SEM_FAILED){
std::cerr << "Error init of semaphore mutex" << std::endl;
sem_close(turnstile);
sem_unlink("/turnstile");
exit(EXIT_FAILURE);
}
}

template <typename T, bool isStatic>
QueueLib<T, isStatic>::~QueueLib() {
// semaphore turnstile destruction
sem_close(turnstile);
sem_unlink("/turnstile"); // remove the semaphore
// semaphore mutex destruction
sem_close(mutex);
sem_unlink("/mutex"); // remove the semaphore
}

template <typename T, bool isStatic>
void QueueLib<T, isStatic>::getMutex(int idThread){

sem_wait(turnstile);
if(queue.getVerbose())
std::cout << idThread << " -> I'm inside turnstile" << std::endl;
threadsWaiting++;

if(threadsWaiting > 1){
std::string semThreadName = "/semThread" + std::to_string(threadsWaiting);
sem_unlink(semThreadName.c_str()); // if already exists
sem_t* semThread = sem_open(semThreadName.c_str(), O_CREAT | O_EXCL, 0666, 1);

if(semThread == SEM_FAILED){
std::cerr << idThread << " -> Error init of semaphore " << semThreadName << std::endl;
exit(EXIT_FAILURE);
}

semList.push(semThread);

if(queue.getVerbose())
std::cout << idThread << " -> I'm going to wait" << std::endl;

sem_post(turnstile);
sem_wait(semList.getHead());
sem_wait(turnstile);

if(queue.getVerbose())
std::cout << idThread << " -> I'm awake" << std::endl;
}

sem_post(turnstile);

sem_wait(mutex);
if(queue.getVerbose())
std::cout << idThread << " -> I'm inside" << std::endl;
Expand All @@ -34,12 +80,29 @@ template <typename T, bool isStatic>
void QueueLib<T, isStatic>::releaseMutex(int idThread){
if(queue.getVerbose())
std::cout << idThread << " -> I'm going to release the MUTEX" << std::endl;

sem_post(mutex);

sem_wait(turnstile);
threadsWaiting--;
if(threadsWaiting > 0){
sem_t* semThread;
// semThread will be the head semaphore
semList.pull(semThread);
sem_post(semThread);

if(queue.getVerbose())
std::cout << idThread << " -> I woke up the first in line" << std::endl;

sem_close(semThread);
std::string semThreadName = "/semThread" + std::to_string(threadsWaiting);
sem_unlink(semThreadName.c_str()); // remove the semaphore
}
sem_post(turnstile);
}

template <typename T, bool isStatic>
void QueueLib<T, isStatic>::push(const T& data, int param, int idThread){

getMutex(idThread);
queue.push(data, param);
releaseMutex(idThread);
Expand Down
5 changes: 4 additions & 1 deletion src/QueueLib.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@ template <typename T, bool isStatic = true>
class QueueLib {
private:
typename std::conditional<isStatic, StaticLinkedList<T>, GroupsLinkedList<T>>::type queue;
sem_t* turnstile;
sem_t* mutex;
SinglyLinkedList<sem_t*> semList;
int threadsWaiting = 0;

/**
* Takes the mutex, if not available queues on the turnstile semaphore.
* If there are multiple threads waiting, queue on a specific semaphore, and when it is unlocked (in FIFO) it will make a WAIT on the mutex.
* @param idThread it's the thread ID.
*/
void getMutex(int idThread);
Expand Down

0 comments on commit c41ee8f

Please sign in to comment.