6.1. Portable Semaphore

Start cpp section to pthread/pthread_semaphore.hpp[1 /1 ]
     1: #line 1055 "./lpsrc/flx_pthread.pak"
     2: #ifndef __SEMAPHORE__
     3: #define __SEMAPHORE__
     4: #include <flx_pthread_config.hpp>
     5: #include "pthread_mutex.hpp"
     6: 
     7: #include "pthread_win_posix_condv_emul.hpp"
     8: 
     9: namespace flx { namespace pthread {
    10: 
    11: // ********************************************************
    12: /// Semaphore
    13: // ********************************************************
    14: class PTHREAD_EXTERN flx_semaphore_t {
    15:   sem_t sem;
    16: public:
    17:   flx_semaphore_t(int n=0);
    18:   ~flx_semaphore_t();
    19:   void post();
    20:   void operator++() { post(); }
    21:   void wait();
    22:   void operator--() { wait(); }
    23:   int get();
    24:   int operator*() { return get(); }
    25: 
    26:   /// atomic test and decrement if non-zero function.
    27:   /// returns EAGAIN on failure to decrement.
    28:   int trywait();
    29: };
    30: 
    31: }} // namespace pthread, flx
    32: #endif
    33: 
End cpp section to pthread/pthread_semaphore.hpp[1]
Start cpp section to pthread/pthread_semaphore.cpp[1 /1 ]
     1: #line 1089 "./lpsrc/flx_pthread.pak"
     2: #include "pthread_semaphore.hpp"
     3: #include <stdio.h>        // printf debugging
     4: #include <assert.h>
     5: 
     6: namespace flx { namespace pthread {
     7: flx_semaphore_t::flx_semaphore_t(int n) { sem_init(&sem, 0, n); }
     8: flx_semaphore_t::~flx_semaphore_t() { sem_destroy(&sem); }
     9: void flx_semaphore_t::wait() { sem_wait(&sem); }
    10: int flx_semaphore_t::trywait() { return sem_trywait(&sem); }
    11: void flx_semaphore_t::post() { sem_post(&sem); }
    12: int flx_semaphore_t::get(){ int x; sem_getvalue(&sem,&x); return x; }
    13: 
    14: }}
    15: 
End cpp section to pthread/pthread_semaphore.cpp[1]
Start cpp section to pthread/pthread_monitor.hpp[1 /1 ]
     1: #line 1105 "./lpsrc/flx_pthread.pak"
     2: #ifndef __MONITOR__
     3: #define __MONITOR__
     4: #include <flx_pthread_config.hpp>
     5: #include "pthread_mutex.hpp"
     6: #include "pthread_condv.hpp"
     7: #include "pthread_semaphore.hpp"
     8: 
     9: // interface for a consumer/producer queue. threads requesting a resource
    10: // that isn't there block until one is available. push/pop re-entrant
    11: 
    12: namespace flx { namespace pthread {
    13: 
    14: // ********************************************************
    15: /// A monitor is an concurrent version of a channel.
    16: /// It matches up readers and writers in pairs,
    17: /// synchronising transfer of one datum.
    18: ///
    19: /// Unlike the bounded queue below, a monitor is a fully
    20: /// synchronised unbuffered transfer, mediated by a full
    21: /// handshake.
    22: ///
    23: /// In particular, unlike the queue of size 1, the writer
    24: /// cannot proceed until the reader sends an acknowlege
    25: /// signal.
    26: ///
    27: /// This logic matches that provides by schannels, but
    28: /// across an asynchronous boundary.
    29: // ********************************************************
    30: 
    31: class PTHREAD_EXTERN monitor_t {
    32:   flx_mutex_t m;
    33:   flx_mutex_t rm;
    34:   flx_mutex_t wm;
    35:   int dataput;
    36:   int datagot;
    37:   flx_condv_t ack;
    38:   void *data;
    39: public:
    40:   monitor_t();
    41:   ~monitor_t();
    42:   void enqueue(void*);
    43:   void* dequeue();
    44: };
    45: 
    46: }} // namespace pthread, flx
    47: #endif
    48: 
End cpp section to pthread/pthread_monitor.hpp[1]
Start cpp section to pthread/pthread_monitor.cpp[1 /1 ]
     1: #line 1154 "./lpsrc/flx_pthread.pak"
     2: #include "pthread_monitor.hpp"
     3: #include <queue>        // stl to the bloated rescue
     4: #include <string.h>       // strerror
     5: #include <assert.h>
     6: 
     7: using namespace std;
     8: 
     9: namespace flx { namespace pthread {
    10: 
    11: monitor_t::monitor_t() : dataput(0),datagot(0) {}
    12: monitor_t::~monitor_t() { }
    13: inline static void handshake_pos(int &a, flx_condv_t &c, flx_mutex_t &m)
    14: {
    15:   ++a;
    16:   if(a != 0) do { c.wait(&m); } while (a != 0);
    17:   else c.signal();
    18:   assert(a == 0);
    19:   //if(!(a == 0)) fprintf(stderr,"ASSER FAIL\n");
    20: }
    21: 
    22: inline static void handshake_neg(int &a, flx_condv_t &c, flx_mutex_t &m)
    23: {
    24:   --a;
    25:   if(a != 0) do { c.wait(&m); } while (a != 0);
    26:   else c.signal();
    27:   assert(a == 0);
    28:   //if(!(a == 0)) fprintf(stderr,"ASSER FAIL\n");
    29: }
    30: 
    31: void
    32: monitor_t::enqueue(void* elt)
    33: {
    34:   flx_mutex_locker_t   wl(wm); // exclude other writers
    35:   flx_mutex_locker_t   l(m);
    36:   data = elt;
    37:   handshake_pos(dataput, ack, m);
    38:   handshake_pos(datagot, ack, m);
    39: }
    40: 
    41: void*
    42: monitor_t::dequeue()
    43: {
    44:   flx_mutex_locker_t   rl(rm); // exclude other readers
    45:   flx_mutex_locker_t   l(m);
    46:   handshake_neg(dataput, ack, m);
    47:   void *d = data;              // get the data
    48:   handshake_neg(datagot, ack, m);
    49:   return d;
    50: }
    51: 
    52: }}
    53: 
End cpp section to pthread/pthread_monitor.cpp[1]