diff --git a/main.cpp b/main.cpp index 8eb55d7..3f24934 100644 --- a/main.cpp +++ b/main.cpp @@ -2,18 +2,25 @@ #include "src/QueueLib.h" #include #include // to include type uintptr_t +#include // to include rand() +#include // to include usleep() -//QueueLib queue(true); -QueueLib queue(true); +QueueLib queue(true); +//QueueLib queue(true); +int delayMicroseconds = rand() % 900000 + 100000; // random delay between 100ms and 1000ms -void* threadFunction1(void* arg) { + +void* threadFunction1(void*) { uintptr_t tid = (uintptr_t) pthread_self(); std::cout << "Thread " << (int)tid << " in execution" << std::endl; + queue.push(1, tid); + usleep(delayMicroseconds); queue.push(1, 1, tid); - queue.push(3, 3, tid); + usleep(delayMicroseconds); + queue.push(3, 1, tid); int value = 0; queue.pull(value, tid); @@ -24,13 +31,16 @@ void* threadFunction1(void* arg) { return NULL; } -void* threadFunction2(void* arg) { +void* threadFunction2(void*) { uintptr_t tid = (uintptr_t) pthread_self(); std::cout << "Thread " << (int)tid << " in execution" << std::endl; + queue.push(2, tid); + usleep(delayMicroseconds); queue.push(2, 2, tid); - queue.push(4, 4, tid); + usleep(delayMicroseconds); + queue.push(4, 2, tid); int value = 0; queue.pull(value, tid); @@ -41,13 +51,16 @@ void* threadFunction2(void* arg) { return NULL; } -void* threadFunction3(void* arg) { +void* threadFunction3(void*) { uintptr_t tid = (uintptr_t) pthread_self(); std::cout << "Thread " << (int)tid << " in execution" << std::endl; - queue.push(5, 5, tid); - queue.push(6, 6, tid); + queue.push(3, tid); + usleep(delayMicroseconds); + queue.push(5, 3, tid); + usleep(delayMicroseconds); + queue.push(6, 3, tid); int value = 0; queue.pull(value, tid); diff --git a/src/QueueLib.cpp b/src/QueueLib.cpp index 6bf622e..3258a9c 100644 --- a/src/QueueLib.cpp +++ b/src/QueueLib.cpp @@ -9,15 +9,30 @@ template QueueLib::QueueLib(bool isVerbose) { queue.setVerbose(isVerbose); + // semaphore turnstile initialization + sem_unlink("/turnstile"); // if already exists + turnstile = sem_open("/turnstile", O_CREAT | O_EXCL, 0666, 1); + if(turnstile == SEM_FAILED){ + std::cerr << "Error init of semaphore turnstile" << std::endl; + exit(EXIT_FAILURE); + } + // semaphore mutex initialization sem_unlink("/mutex"); // if already exists mutex = sem_open("/mutex", O_CREAT | O_EXCL, 0666, 1); - if(mutex == SEM_FAILED) - std::cout << "Error init of semaphore mutex" << std::endl; + if(mutex == SEM_FAILED){ + std::cerr << "Error init of semaphore mutex" << std::endl; + sem_close(turnstile); + sem_unlink("/turnstile"); + exit(EXIT_FAILURE); + } } template QueueLib::~QueueLib() { + // semaphore turnstile destruction + sem_close(turnstile); + sem_unlink("/turnstile"); // remove the semaphore // semaphore mutex destruction sem_close(mutex); sem_unlink("/mutex"); // remove the semaphore @@ -25,6 +40,37 @@ QueueLib::~QueueLib() { template void QueueLib::getMutex(int idThread){ + + sem_wait(turnstile); + if(queue.getVerbose()) + std::cout << idThread << " -> I'm inside turnstile" << std::endl; + threadsWaiting++; + + if(threadsWaiting > 1){ + std::string semThreadName = "/semThread" + std::to_string(threadsWaiting); + sem_unlink(semThreadName.c_str()); // if already exists + sem_t* semThread = sem_open(semThreadName.c_str(), O_CREAT | O_EXCL, 0666, 1); + + if(semThread == SEM_FAILED){ + std::cerr << idThread << " -> Error init of semaphore " << semThreadName << std::endl; + exit(EXIT_FAILURE); + } + + semList.push(semThread); + + if(queue.getVerbose()) + std::cout << idThread << " -> I'm going to wait" << std::endl; + + sem_post(turnstile); + sem_wait(semList.getHead()); + sem_wait(turnstile); + + if(queue.getVerbose()) + std::cout << idThread << " -> I'm awake" << std::endl; + } + + sem_post(turnstile); + sem_wait(mutex); if(queue.getVerbose()) std::cout << idThread << " -> I'm inside" << std::endl; @@ -34,12 +80,29 @@ template void QueueLib::releaseMutex(int idThread){ if(queue.getVerbose()) std::cout << idThread << " -> I'm going to release the MUTEX" << std::endl; + sem_post(mutex); + + sem_wait(turnstile); + threadsWaiting--; + if(threadsWaiting > 0){ + sem_t* semThread; + // semThread will be the head semaphore + semList.pull(semThread); + sem_post(semThread); + + if(queue.getVerbose()) + std::cout << idThread << " -> I woke up the first in line" << std::endl; + + sem_close(semThread); + std::string semThreadName = "/semThread" + std::to_string(threadsWaiting); + sem_unlink(semThreadName.c_str()); // remove the semaphore + } + sem_post(turnstile); } template void QueueLib::push(const T& data, int param, int idThread){ - getMutex(idThread); queue.push(data, param); releaseMutex(idThread); diff --git a/src/QueueLib.h b/src/QueueLib.h index 8f2ca1f..40cbb27 100644 --- a/src/QueueLib.h +++ b/src/QueueLib.h @@ -16,10 +16,13 @@ template class QueueLib { private: typename std::conditional, GroupsLinkedList>::type queue; + sem_t* turnstile; sem_t* mutex; + SinglyLinkedList semList; + int threadsWaiting = 0; /** - * Takes the mutex, if not available queues on the turnstile semaphore. + * If there are multiple threads waiting, queue on a specific semaphore, and when it is unlocked (in FIFO) it will make a WAIT on the mutex. * @param idThread it's the thread ID. */ void getMutex(int idThread);