4. Scheduler

Start cpp section to faio/faio_drv.hpp[1 /1 ]
     1: #line 723 "./lpsrc/flx_faio.pak"
     2: #ifndef __FLXDRV__
     3: #define __FLXDRV__
     4: #include <flx_faio_config.hpp>
     5: 
     6: #include "pthread_sleep_queue.hpp"
     7: #include "pthread_work_fifo.hpp"
     8: #include "demux_timer_queue.hpp"
     9: #include "demux_demuxer.hpp"
    10: 
    11: namespace flx { namespace faio {
    12: // vestigal driver class is all that remains of the embedded driver from
    13: // which faio came. all it is now is a pointer to a queue.
    14: // watch this space.
    15: class FAIO_EXTERN flx_drv {
    16:     flx::pthread::sleep_queue_t&    ready_queue;
    17:     flx::pthread::worker_fifo     work_fifo;
    18: public:
    19:     flx_drv(
    20:       flx::pthread::sleep_queue_t& q,
    21:       int qbound, int nthreads
    22:     );
    23:     virtual ~flx_drv();
    24: 
    25:     void sched(void* f);
    26: 
    27:     flx::pthread::worker_fifo* get_worker_fifo() { return &work_fifo; }
    28:     virtual demux::timer_queue* get_sleepers() = 0;
    29: };
    30: 
    31: // this becomes socket thread wakeup. um.
    32: class FAIO_EXTERN thread_wakeup {
    33: public:
    34:     void wake() { drv->sched(f); }
    35: 
    36:     void*           f;              // thread to be woken
    37:     flx_drv*        drv;            // in which driver
    38: };
    39: 
    40: // to be called inside every start_async_op
    41: // note: the windows version often needs the demuxer, but the posix one
    42: // doesn't. I forget why.
    43: #define RECORD_THREAD_INFO(w) (w).f=f;\
    44:                               (w).drv=drv;
    45: 
    46: }}
    47: #endif              //__FLXDRV__
    48: 
    49: 
End cpp section to faio/faio_drv.hpp[1]
Start cpp section to faio/faio_drv.cpp[1 /1 ]
     1: #line 773 "./lpsrc/flx_faio.pak"
     2: #include <stdio.h>              // debug printf
     3: #include "faio_drv.hpp"
     4: 
     5: using namespace flx::demux;
     6: namespace flx { namespace faio {
     7: flx_drv::flx_drv(flx::pthread::sleep_queue_t& q, int jobqbound, int nthreads)
     8:     : ready_queue(q), work_fifo(jobqbound,nthreads)
     9: {
    10: }
    11: 
    12: flx_drv::~flx_drv()
    13: {
    14: }
    15: 
    16: void
    17: flx_drv::sched(void* f)
    18: {
    19:     ready_queue.enqueue(f);
    20: }
    21: 
    22: }}
    23: 
End cpp section to faio/faio_drv.cpp[1]
Start cpp section to faio/faio_pdrv.hpp[1 /1 ]
     1: #line 797 "./lpsrc/flx_faio.pak"
     2: #ifndef __FAIO_PDRV__
     3: #define __FAIO_PDRV__
     4: 
     5: #include <flx_faio_config.hpp>
     6: #include "demux_pfileio.hpp"
     7: #include "demux_posix_timer_queue.hpp"
     8: #include "faio_drv.hpp"
     9: namespace flx { namespace faio {
    10: 
    11: // same as flx_drv, but with an async file io worker fifo.
    12: class FAIO_EXTERN pflx_drv : public flx_drv {
    13:     demux::pasync_fileio       file_aio_worker;
    14:     demux::posix_timer_queue   sleepers;
    15: public:
    16:     pflx_drv(flx::pthread::sleep_queue_t& q, int n1, int m2, int n2, int m2);
    17: 
    18:     demux::pasync_fileio* get_aio_worker() { return &file_aio_worker; }
    19:     demux::timer_queue* get_sleepers() { return &sleepers; }
    20:     bool debug;
    21:     void set_debug(bool d) { debug = d; }
    22: };
    23: }}
    24: #endif
    25: 
End cpp section to faio/faio_pdrv.hpp[1]
Start cpp section to faio/faio_pdrv.cpp[1 /1 ]
     1: #line 823 "./lpsrc/flx_faio.pak"
     2: #include "faio_pdrv.hpp"
     3: #include <signal.h>     // for SIGPIPE portable ignoring
     4: #include <stdio.h>      // printf
     5: 
     6: using namespace flx::demux;
     7: namespace flx { namespace faio {
     8: 
     9: // what is this stuff?? 3 queues needed?
    10: // YUP! The job queue schedules jobs, the demux queue
    11: // gets the return events, and the sleep queue is to
    12: // put woken up felix threads on so they end up later
    13: // in the active queue (that's a fourth queue .. :))
    14: 
    15: pflx_drv::pflx_drv(flx::pthread::sleep_queue_t& q, int n1, int m1, int n2, int m2)
    16:     : flx_drv(q,n1,m1), file_aio_worker(n2,m2)
    17: {
    18:     // we might actually like to ignore SIGPIPE's conditionally,
    19:     // e.g. when a flx prog actually requests socket io, we could even
    20:     // transform it into a flx level signal (if those exist, signals are
    21:     // pretty lame & don't work well for library code - would you want them
    22:     // to exist?) (osx has a per socket sockopt that does this, linux has
    23:     // the send flag MSG_NOSIGNAL)
    24: 
    25:     // this is actually demux/flxasync io's problem, so shift it there
    26:     if(debug)fprintf(stderr,"pdrv installing SIGPIPE ignorer\n");
    27:     //if(debug)fprintf(stderr,"OSX/bsd only: setsockopt(SO_NOSIGPIPE) -> EPIPE\n");
    28:     // sig_t   prev_handler;
    29:     void (*prev_handler)(int);  // solaris is FUN.
    30:     prev_handler = signal(SIGPIPE, SIG_IGN);
    31: 
    32:     if(SIG_ERR == prev_handler)
    33:     {
    34:         fprintf(stderr, "failed to install SIGPIPE ignorer\n");
    35:         throw -1;
    36:     }
    37:     else if(prev_handler != SIG_IGN && prev_handler != SIG_DFL)
    38:     {
    39:         fprintf(stderr,"warning: blew away prev SIGPIPE handler: %p\n",
    40:             prev_handler);
    41:     }
    42: }
    43: 
    44: }}
    45: 
End cpp section to faio/faio_pdrv.cpp[1]